From b7a13e15ff63204431d2b5f19f65291b11783466 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 14 Jan 2026 14:27:39 -0800 Subject: [PATCH] refactor: enhance agent kickoff preparation by separating common logic Updated the Agent class to introduce a new private method that consolidates the common setup logic for both synchronous and asynchronous kickoff executions. This change improves code clarity and maintainability by reducing redundancy in the kickoff process, while ensuring that the agent can still execute effectively within both standalone and flow contexts. --- lib/crewai/src/crewai/agent/core.py | 151 +++++++++------------------- 1 file changed, 46 insertions(+), 105 deletions(-) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 571777b3a..d815b15ed 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -92,6 +92,7 @@ if TYPE_CHECKING: from crewai.agents.agent_builder.base_agent import PlatformAppOrAction from crewai.task import Task from crewai.tools.base_tool import BaseTool + from crewai.tools.structured_tool import CrewStructuredTool from crewai.utilities.types import LLMMessage @@ -1574,34 +1575,24 @@ class Agent(BaseAgent): ) return None - def kickoff( + def _prepare_kickoff( self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - ) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]: - """ - Execute the agent with the given messages using the AgentExecutor. + ) -> tuple[AgentExecutor, dict[str, str], dict[str, Any], list[CrewStructuredTool]]: + """Prepare common setup for kickoff execution. - This method provides standalone agent execution without requiring a Crew. - It supports tools, response formatting, and guardrails. - - When called from within a Flow (inside an event loop), this method - automatically returns a coroutine that the Flow framework will await, - making it work seamlessly in both sync and async contexts. + This method handles all the common preparation logic shared between + kickoff() and kickoff_async(), including tool processing, prompt building, + executor creation, and input formatting. Args: messages: Either a string query or a list of message dictionaries. - If a string is provided, it will be converted to a user message. - If a list is provided, each dict should have 'role' and 'content' keys. response_format: Optional Pydantic model for structured output. Returns: - LiteAgentOutput: The result of the agent execution. - Or a coroutine if called from within an event loop. + Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution. """ - if is_inside_event_loop(): - return self.kickoff_async(messages, response_format) - # Process platform apps and MCP tools if self.apps: platform_tools = self.get_platform_tools(self.apps) @@ -1672,10 +1663,10 @@ class Agent(BaseAgent): i18n=self.i18n, ) + # Format messages if isinstance(messages, str): formatted_messages = messages else: - # Convert list of messages to a single input string formatted_messages = "\n".join( str(msg.get("content", "")) for msg in messages if msg.get("content") ) @@ -1687,8 +1678,42 @@ class Agent(BaseAgent): "tools": render_text_description_and_args(parsed_tools), } + return executor, inputs, agent_info, parsed_tools + + def kickoff( + self, + messages: str | list[LLMMessage], + response_format: type[Any] | None = None, + ) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]: + """ + Execute the agent with the given messages using the AgentExecutor. + + This method provides standalone agent execution without requiring a Crew. + It supports tools, response formatting, and guardrails. + + When called from within a Flow (inside an event loop), this method + automatically returns a coroutine that the Flow framework will await, + making it work seamlessly in both sync and async contexts. + + Args: + messages: Either a string query or a list of message dictionaries. + If a string is provided, it will be converted to a user message. + If a list is provided, each dict should have 'role' and 'content' keys. + response_format: Optional Pydantic model for structured output. + + Returns: + LiteAgentOutput: The result of the agent execution. + Or a coroutine if called from within an event loop. + """ + # Magic auto-async: return coroutine for Flow to await + if is_inside_event_loop(): + return self.kickoff_async(messages, response_format) + + executor, inputs, agent_info, parsed_tools = self._prepare_kickoff( + messages, response_format + ) + try: - # Emit started event for backward compatibility with LiteAgent listeners crewai_event_bus.emit( self, event=LiteAgentExecutionStartedEvent( @@ -1698,7 +1723,6 @@ class Agent(BaseAgent): ), ) - # Execute and build output output = self._execute_and_build_output(executor, inputs, response_format) if self.guardrail is not None: @@ -1950,93 +1974,11 @@ class Agent(BaseAgent): Returns: LiteAgentOutput: The result of the agent execution. """ - # Process platform apps and MCP tools - if self.apps: - platform_tools = self.get_platform_tools(self.apps) - if platform_tools and self.tools is not None: - self.tools.extend(platform_tools) - if self.mcps: - mcps = self.get_mcp_tools(self.mcps) - if mcps and self.tools is not None: - self.tools.extend(mcps) - - # Prepare tools - raw_tools: list[BaseTool] = self.tools or [] - parsed_tools = parse_tools(raw_tools) - - # Build agent_info for backward-compatible event emission - agent_info = { - "id": self.id, - "role": self.role, - "goal": self.goal, - "backstory": self.backstory, - "tools": raw_tools, - "verbose": self.verbose, - } - - # Build prompt for standalone execution - prompt = Prompts( - agent=self, - has_tools=len(raw_tools) > 0, - i18n=self.i18n, - use_system_prompt=self.use_system_prompt, - system_template=self.system_template, - prompt_template=self.prompt_template, - response_template=self.response_template, - ).task_execution() - - # Prepare stop words - stop_words = [self.i18n.slice("observation")] - if self.response_template: - stop_words.append( - self.response_template.split("{{ .Response }}")[1].strip() - ) - - # Get RPM limit function - rpm_limit_fn = ( - self._rpm_controller.check_or_wait if self._rpm_controller else None + executor, inputs, agent_info, parsed_tools = self._prepare_kickoff( + messages, response_format ) - # Create the executor for standalone mode (no crew, no task) - executor = AgentExecutor( - task=None, - crew=None, - llm=cast(BaseLLM, self.llm), - agent=self, - prompt=prompt, - max_iter=self.max_iter, - tools=parsed_tools, - tools_names=get_tool_names(parsed_tools), - stop_words=stop_words, - tools_description=render_text_description_and_args(parsed_tools), - tools_handler=self.tools_handler, - original_tools=raw_tools, - step_callback=self.step_callback, - function_calling_llm=self.function_calling_llm, - respect_context_window=self.respect_context_window, - request_within_rpm_limit=rpm_limit_fn, - callbacks=[TokenCalcHandler(self._token_process)], - response_model=response_format, - i18n=self.i18n, - ) - - if isinstance(messages, str): - formatted_messages = messages - else: - # Convert list of messages to a single input string - formatted_messages = "\n".join( - str(msg.get("content", "")) for msg in messages if msg.get("content") - ) - - # Build the input dict for the executor - inputs = { - "input": formatted_messages, - "tool_names": get_tool_names(parsed_tools), - "tools": render_text_description_and_args(parsed_tools), - } - try: - # Emit started event for backward compatibility with LiteAgent listeners crewai_event_bus.emit( self, event=LiteAgentExecutionStartedEvent( @@ -2046,7 +1988,6 @@ class Agent(BaseAgent): ), ) - # Execute asynchronously using invoke_async output = await self._execute_and_build_output_async( executor, inputs, response_format )