refactor: enhance agent kickoff preparation by separating common logic

Updated the Agent class to introduce a new private method  that consolidates the common setup logic for both synchronous and asynchronous kickoff executions. This change improves code clarity and maintainability by reducing redundancy in the kickoff process, while ensuring that the agent can still execute effectively within both standalone and flow contexts.
This commit is contained in:
lorenzejay
2026-01-14 14:27:39 -08:00
parent 13dc7e25e0
commit b7a13e15ff

View File

@@ -92,6 +92,7 @@ if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import PlatformAppOrAction
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.types import LLMMessage
@@ -1574,34 +1575,24 @@ class Agent(BaseAgent):
)
return None
def kickoff(
def _prepare_kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""
Execute the agent with the given messages using the AgentExecutor.
) -> tuple[AgentExecutor, dict[str, str], dict[str, Any], list[CrewStructuredTool]]:
"""Prepare common setup for kickoff execution.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, and guardrails.
When called from within a Flow (inside an event loop), this method
automatically returns a coroutine that the Flow framework will await,
making it work seamlessly in both sync and async contexts.
This method handles all the common preparation logic shared between
kickoff() and kickoff_async(), including tool processing, prompt building,
executor creation, and input formatting.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
LiteAgentOutput: The result of the agent execution.
Or a coroutine if called from within an event loop.
Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution.
"""
if is_inside_event_loop():
return self.kickoff_async(messages, response_format)
# Process platform apps and MCP tools
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
@@ -1672,10 +1663,10 @@ class Agent(BaseAgent):
i18n=self.i18n,
)
# Format messages
if isinstance(messages, str):
formatted_messages = messages
else:
# Convert list of messages to a single input string
formatted_messages = "\n".join(
str(msg.get("content", "")) for msg in messages if msg.get("content")
)
@@ -1687,8 +1678,42 @@ class Agent(BaseAgent):
"tools": render_text_description_and_args(parsed_tools),
}
return executor, inputs, agent_info, parsed_tools
def kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""
Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, and guardrails.
When called from within a Flow (inside an event loop), this method
automatically returns a coroutine that the Flow framework will await,
making it work seamlessly in both sync and async contexts.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
LiteAgentOutput: The result of the agent execution.
Or a coroutine if called from within an event loop.
"""
# Magic auto-async: return coroutine for Flow to await
if is_inside_event_loop():
return self.kickoff_async(messages, response_format)
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
)
try:
# Emit started event for backward compatibility with LiteAgent listeners
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
@@ -1698,7 +1723,6 @@ class Agent(BaseAgent):
),
)
# Execute and build output
output = self._execute_and_build_output(executor, inputs, response_format)
if self.guardrail is not None:
@@ -1950,93 +1974,11 @@ class Agent(BaseAgent):
Returns:
LiteAgentOutput: The result of the agent execution.
"""
# Process platform apps and MCP tools
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
if platform_tools and self.tools is not None:
self.tools.extend(platform_tools)
if self.mcps:
mcps = self.get_mcp_tools(self.mcps)
if mcps and self.tools is not None:
self.tools.extend(mcps)
# Prepare tools
raw_tools: list[BaseTool] = self.tools or []
parsed_tools = parse_tools(raw_tools)
# Build agent_info for backward-compatible event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": raw_tools,
"verbose": self.verbose,
}
# Build prompt for standalone execution
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
prompt_template=self.prompt_template,
response_template=self.response_template,
).task_execution()
# Prepare stop words
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
# Get RPM limit function
rpm_limit_fn = (
self._rpm_controller.check_or_wait if self._rpm_controller else None
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
)
# Create the executor for standalone mode (no crew, no task)
executor = AgentExecutor(
task=None,
crew=None,
llm=cast(BaseLLM, self.llm),
agent=self,
prompt=prompt,
max_iter=self.max_iter,
tools=parsed_tools,
tools_names=get_tool_names(parsed_tools),
stop_words=stop_words,
tools_description=render_text_description_and_args(parsed_tools),
tools_handler=self.tools_handler,
original_tools=raw_tools,
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=response_format,
i18n=self.i18n,
)
if isinstance(messages, str):
formatted_messages = messages
else:
# Convert list of messages to a single input string
formatted_messages = "\n".join(
str(msg.get("content", "")) for msg in messages if msg.get("content")
)
# Build the input dict for the executor
inputs = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),
"tools": render_text_description_and_args(parsed_tools),
}
try:
# Emit started event for backward compatibility with LiteAgent listeners
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
@@ -2046,7 +1988,6 @@ class Agent(BaseAgent):
),
)
# Execute asynchronously using invoke_async
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)