diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 571777b3a..d815b15ed 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -92,6 +92,7 @@ if TYPE_CHECKING: from crewai.agents.agent_builder.base_agent import PlatformAppOrAction from crewai.task import Task from crewai.tools.base_tool import BaseTool + from crewai.tools.structured_tool import CrewStructuredTool from crewai.utilities.types import LLMMessage @@ -1574,34 +1575,24 @@ class Agent(BaseAgent): ) return None - def kickoff( + def _prepare_kickoff( self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - ) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]: - """ - Execute the agent with the given messages using the AgentExecutor. + ) -> tuple[AgentExecutor, dict[str, str], dict[str, Any], list[CrewStructuredTool]]: + """Prepare common setup for kickoff execution. - This method provides standalone agent execution without requiring a Crew. - It supports tools, response formatting, and guardrails. - - When called from within a Flow (inside an event loop), this method - automatically returns a coroutine that the Flow framework will await, - making it work seamlessly in both sync and async contexts. + This method handles all the common preparation logic shared between + kickoff() and kickoff_async(), including tool processing, prompt building, + executor creation, and input formatting. Args: messages: Either a string query or a list of message dictionaries. - If a string is provided, it will be converted to a user message. - If a list is provided, each dict should have 'role' and 'content' keys. response_format: Optional Pydantic model for structured output. Returns: - LiteAgentOutput: The result of the agent execution. - Or a coroutine if called from within an event loop. + Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution. """ - if is_inside_event_loop(): - return self.kickoff_async(messages, response_format) - # Process platform apps and MCP tools if self.apps: platform_tools = self.get_platform_tools(self.apps) @@ -1672,10 +1663,10 @@ class Agent(BaseAgent): i18n=self.i18n, ) + # Format messages if isinstance(messages, str): formatted_messages = messages else: - # Convert list of messages to a single input string formatted_messages = "\n".join( str(msg.get("content", "")) for msg in messages if msg.get("content") ) @@ -1687,8 +1678,42 @@ class Agent(BaseAgent): "tools": render_text_description_and_args(parsed_tools), } + return executor, inputs, agent_info, parsed_tools + + def kickoff( + self, + messages: str | list[LLMMessage], + response_format: type[Any] | None = None, + ) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]: + """ + Execute the agent with the given messages using the AgentExecutor. + + This method provides standalone agent execution without requiring a Crew. + It supports tools, response formatting, and guardrails. + + When called from within a Flow (inside an event loop), this method + automatically returns a coroutine that the Flow framework will await, + making it work seamlessly in both sync and async contexts. + + Args: + messages: Either a string query or a list of message dictionaries. + If a string is provided, it will be converted to a user message. + If a list is provided, each dict should have 'role' and 'content' keys. + response_format: Optional Pydantic model for structured output. + + Returns: + LiteAgentOutput: The result of the agent execution. + Or a coroutine if called from within an event loop. + """ + # Magic auto-async: return coroutine for Flow to await + if is_inside_event_loop(): + return self.kickoff_async(messages, response_format) + + executor, inputs, agent_info, parsed_tools = self._prepare_kickoff( + messages, response_format + ) + try: - # Emit started event for backward compatibility with LiteAgent listeners crewai_event_bus.emit( self, event=LiteAgentExecutionStartedEvent( @@ -1698,7 +1723,6 @@ class Agent(BaseAgent): ), ) - # Execute and build output output = self._execute_and_build_output(executor, inputs, response_format) if self.guardrail is not None: @@ -1950,93 +1974,11 @@ class Agent(BaseAgent): Returns: LiteAgentOutput: The result of the agent execution. """ - # Process platform apps and MCP tools - if self.apps: - platform_tools = self.get_platform_tools(self.apps) - if platform_tools and self.tools is not None: - self.tools.extend(platform_tools) - if self.mcps: - mcps = self.get_mcp_tools(self.mcps) - if mcps and self.tools is not None: - self.tools.extend(mcps) - - # Prepare tools - raw_tools: list[BaseTool] = self.tools or [] - parsed_tools = parse_tools(raw_tools) - - # Build agent_info for backward-compatible event emission - agent_info = { - "id": self.id, - "role": self.role, - "goal": self.goal, - "backstory": self.backstory, - "tools": raw_tools, - "verbose": self.verbose, - } - - # Build prompt for standalone execution - prompt = Prompts( - agent=self, - has_tools=len(raw_tools) > 0, - i18n=self.i18n, - use_system_prompt=self.use_system_prompt, - system_template=self.system_template, - prompt_template=self.prompt_template, - response_template=self.response_template, - ).task_execution() - - # Prepare stop words - stop_words = [self.i18n.slice("observation")] - if self.response_template: - stop_words.append( - self.response_template.split("{{ .Response }}")[1].strip() - ) - - # Get RPM limit function - rpm_limit_fn = ( - self._rpm_controller.check_or_wait if self._rpm_controller else None + executor, inputs, agent_info, parsed_tools = self._prepare_kickoff( + messages, response_format ) - # Create the executor for standalone mode (no crew, no task) - executor = AgentExecutor( - task=None, - crew=None, - llm=cast(BaseLLM, self.llm), - agent=self, - prompt=prompt, - max_iter=self.max_iter, - tools=parsed_tools, - tools_names=get_tool_names(parsed_tools), - stop_words=stop_words, - tools_description=render_text_description_and_args(parsed_tools), - tools_handler=self.tools_handler, - original_tools=raw_tools, - step_callback=self.step_callback, - function_calling_llm=self.function_calling_llm, - respect_context_window=self.respect_context_window, - request_within_rpm_limit=rpm_limit_fn, - callbacks=[TokenCalcHandler(self._token_process)], - response_model=response_format, - i18n=self.i18n, - ) - - if isinstance(messages, str): - formatted_messages = messages - else: - # Convert list of messages to a single input string - formatted_messages = "\n".join( - str(msg.get("content", "")) for msg in messages if msg.get("content") - ) - - # Build the input dict for the executor - inputs = { - "input": formatted_messages, - "tool_names": get_tool_names(parsed_tools), - "tools": render_text_description_and_args(parsed_tools), - } - try: - # Emit started event for backward compatibility with LiteAgent listeners crewai_event_bus.emit( self, event=LiteAgentExecutionStartedEvent( @@ -2046,7 +1988,6 @@ class Agent(BaseAgent): ), ) - # Execute asynchronously using invoke_async output = await self._execute_and_build_output_async( executor, inputs, response_format )