ensure executors work inside a flow due to flow in flow async structure

This commit is contained in:
lorenzejay
2026-01-14 14:23:10 -08:00
parent 5cef85c643
commit 13dc7e25e0
4 changed files with 323 additions and 10 deletions

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from collections.abc import Callable, Coroutine, Sequence
import shutil
import subprocess
import time
@@ -70,6 +70,7 @@ from crewai.security.fingerprint import Fingerprint
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities.agent_utils import (
get_tool_names,
is_inside_event_loop,
load_agent_from_repository,
parse_tools,
render_text_description_and_args,
@@ -1577,13 +1578,17 @@ class Agent(BaseAgent):
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""
Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, and guardrails.
When called from within a Flow (inside an event loop), this method
automatically returns a coroutine that the Flow framework will await,
making it work seamlessly in both sync and async contexts.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
@@ -1592,7 +1597,11 @@ class Agent(BaseAgent):
Returns:
LiteAgentOutput: The result of the agent execution.
Or a coroutine if called from within an event loop.
"""
if is_inside_event_loop():
return self.kickoff_async(messages, response_format)
# Process platform apps and MCP tools
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
@@ -1738,8 +1747,70 @@ class Agent(BaseAgent):
"""
import json
# Execute the agent
result = executor.invoke(inputs)
# Execute the agent (this is called from sync path, so invoke returns dict)
result = cast(dict[str, Any], executor.invoke(inputs))
raw_output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
raw=raw_output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
async def _execute_and_build_output_async(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously and build the output object.
This is the async version of _execute_and_build_output that uses
invoke_async() for native async execution within event loops.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent asynchronously
result = await executor.invoke_async(inputs)
raw_output = result.get("output", "")
# Handle response format conversion
@@ -1866,7 +1937,9 @@ class Agent(BaseAgent):
"""
Execute the agent asynchronously with the given messages.
This is the async version of the kickoff method.
This is the async version of the kickoff method that uses native async
execution. It is designed for use within async contexts, such as when
called from within an async Flow method.
Args:
messages: Either a string query or a list of message dictionaries.
@@ -1877,4 +1950,131 @@ class Agent(BaseAgent):
Returns:
LiteAgentOutput: The result of the agent execution.
"""
return await asyncio.to_thread(self.kickoff, messages, response_format)
# Process platform apps and MCP tools
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
if platform_tools and self.tools is not None:
self.tools.extend(platform_tools)
if self.mcps:
mcps = self.get_mcp_tools(self.mcps)
if mcps and self.tools is not None:
self.tools.extend(mcps)
# Prepare tools
raw_tools: list[BaseTool] = self.tools or []
parsed_tools = parse_tools(raw_tools)
# Build agent_info for backward-compatible event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": raw_tools,
"verbose": self.verbose,
}
# Build prompt for standalone execution
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
prompt_template=self.prompt_template,
response_template=self.response_template,
).task_execution()
# Prepare stop words
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
# Get RPM limit function
rpm_limit_fn = (
self._rpm_controller.check_or_wait if self._rpm_controller else None
)
# Create the executor for standalone mode (no crew, no task)
executor = AgentExecutor(
task=None,
crew=None,
llm=cast(BaseLLM, self.llm),
agent=self,
prompt=prompt,
max_iter=self.max_iter,
tools=parsed_tools,
tools_names=get_tool_names(parsed_tools),
stop_words=stop_words,
tools_description=render_text_description_and_args(parsed_tools),
tools_handler=self.tools_handler,
original_tools=raw_tools,
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=response_format,
i18n=self.i18n,
)
if isinstance(messages, str):
formatted_messages = messages
else:
# Convert list of messages to a single input string
formatted_messages = "\n".join(
str(msg.get("content", "")) for msg in messages if msg.get("content")
)
# Build the input dict for the executor
inputs = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),
"tools": render_text_description_and_args(parsed_tools),
}
try:
# Emit started event for backward compatibility with LiteAgent listeners
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
),
)
# Execute asynchronously using invoke_async
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
except Exception as e:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from collections.abc import Callable
from collections.abc import Callable, Coroutine
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -37,6 +37,7 @@ from crewai.utilities.agent_utils import (
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
is_inside_event_loop,
process_llm_response,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -182,7 +183,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
else self.stop
)
)
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
@@ -453,9 +453,99 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "initialized"
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
def invoke(
self, inputs: dict[str, Any]
) -> dict[str, Any] | Coroutine[Any, Any, dict[str, Any]]:
"""Execute agent with given inputs.
When called from within an existing event loop (e.g., inside a Flow),
this method returns a coroutine that should be awaited. The Flow
framework handles this automatically.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output, or a coroutine if inside an event loop.
"""
# Magic auto-async: if inside event loop, return coroutine for Flow to await
if is_inside_event_loop():
return self.invoke_async(inputs)
self._ensure_flow_initialized()
with self._execution_lock:
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True
try:
# Reset state for fresh execution
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.ask_for_human_input = bool(
inputs.get("ask_for_human_input", False)
)
self.kickoff()
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
except AssertionError:
fail_text = Text()
fail_text.append("", style="red bold")
fail_text.append(
"Agent failed to reach a final answer. This is likely a bug - please report it.",
style="red",
)
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False
async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent asynchronously with given inputs.
This method is designed for use within async contexts, such as when
the agent is called from within an async Flow method. It uses
kickoff_async() directly instead of running in a separate thread.
Args:
inputs: Input dictionary containing prompt variables.
@@ -496,7 +586,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
inputs.get("ask_for_human_input", False)
)
self.kickoff()
# Use async kickoff directly since we're already in an async context
await self.kickoff_async()
formatted_answer = self.state.current_answer

View File

@@ -1579,6 +1579,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
else method(*args, **kwargs)
)
# Auto-await coroutines from sync methods (enables agent.kickoff() inside flows)
if asyncio.iscoroutine(result):
result = await result
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
import json
import re
@@ -54,6 +55,23 @@ console = Console()
_MULTIPLE_NEWLINES: Final[re.Pattern[str]] = re.compile(r"\n+")
def is_inside_event_loop() -> bool:
"""Check if code is currently running inside an asyncio event loop.
This is used to detect when code is being called from within an async context
(e.g., inside a Flow). In such cases, callers should return a coroutine
instead of executing synchronously to avoid nested event loop errors.
Returns:
True if inside a running event loop, False otherwise.
"""
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False
def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
"""Parse tools to be used for the task.