diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 985a0e2c6..f59724343 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -2,7 +2,6 @@ from __future__ import annotations import asyncio from collections.abc import Sequence -import json import shutil import subprocess import time @@ -19,6 +18,19 @@ from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator from typing_extensions import Self from crewai.a2a.config import A2AConfig +from crewai.agent.utils import ( + ahandle_knowledge_retrieval, + apply_training_data, + build_task_prompt_with_schema, + format_task_with_context, + get_knowledge_config, + handle_knowledge_retrieval, + handle_reasoning, + prepare_tools, + process_tool_results, + save_last_messages, + validate_max_execution_time, +) from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.cache.cache_handler import CacheHandler from crewai.agents.crew_agent_executor import CrewAgentExecutor @@ -27,9 +39,6 @@ from crewai.events.types.knowledge_events import ( KnowledgeQueryCompletedEvent, KnowledgeQueryFailedEvent, KnowledgeQueryStartedEvent, - KnowledgeRetrievalCompletedEvent, - KnowledgeRetrievalStartedEvent, - KnowledgeSearchQueryFailedEvent, ) from crewai.events.types.memory_events import ( MemoryRetrievalCompletedEvent, @@ -37,7 +46,6 @@ from crewai.events.types.memory_events import ( ) from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource -from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context from crewai.lite_agent import LiteAgent from crewai.llms.base_llm import BaseLLM from crewai.mcp import ( @@ -61,7 +69,7 @@ from crewai.utilities.agent_utils import ( render_text_description_and_args, ) from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE -from crewai.utilities.converter import Converter, generate_model_description +from crewai.utilities.converter import Converter from crewai.utilities.guardrail_types import GuardrailType from crewai.utilities.llm_utils import create_llm from crewai.utilities.prompts import Prompts @@ -295,53 +303,15 @@ class Agent(BaseAgent): ValueError: If the max execution time is not a positive integer. RuntimeError: If the agent execution fails for other reasons. """ - if self.reasoning: - try: - from crewai.utilities.reasoning_handler import ( - AgentReasoning, - AgentReasoningOutput, - ) - - reasoning_handler = AgentReasoning(task=task, agent=self) - reasoning_output: AgentReasoningOutput = ( - reasoning_handler.handle_agent_reasoning() - ) - - # Add the reasoning plan to the task description - task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}" - except Exception as e: - self._logger.log("error", f"Error during reasoning process: {e!s}") + handle_reasoning(self, task) self._inject_date_to_task(task) if self.tools_handler: self.tools_handler.last_used_tool = None task_prompt = task.prompt() - - # If the task requires output in JSON or Pydantic format, - # append specific instructions to the task prompt to ensure - # that the final answer does not include any code block markers - # Skip this if task.response_model is set, as native structured outputs handle schema automatically - if (task.output_json or task.output_pydantic) and not task.response_model: - # Generate the schema based on the output format - if task.output_json: - schema_dict = generate_model_description(task.output_json) - schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2) - task_prompt += "\n" + self.i18n.slice( - "formatted_task_instructions" - ).format(output_format=schema) - - elif task.output_pydantic: - schema_dict = generate_model_description(task.output_pydantic) - schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2) - task_prompt += "\n" + self.i18n.slice( - "formatted_task_instructions" - ).format(output_format=schema) - - if context: - task_prompt = self.i18n.slice("task_with_context").format( - task=task_prompt, context=context - ) + task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n) + task_prompt = format_task_with_context(task_prompt, context, self.i18n) if self._is_any_available_memory(): crewai_event_bus.emit( @@ -379,84 +349,20 @@ class Agent(BaseAgent): from_task=task, ), ) - knowledge_config = ( - self.knowledge_config.model_dump() if self.knowledge_config else {} + + knowledge_config = get_knowledge_config(self) + task_prompt = handle_knowledge_retrieval( + self, + task, + task_prompt, + knowledge_config, + self.knowledge.query if self.knowledge else lambda *a, **k: None, + self.crew.query_knowledge if self.crew else lambda *a, **k: None, ) - if self.knowledge or (self.crew and self.crew.knowledge): - crewai_event_bus.emit( - self, - event=KnowledgeRetrievalStartedEvent( - from_task=task, - from_agent=self, - ), - ) - try: - self.knowledge_search_query = self._get_knowledge_search_query( - task_prompt, task - ) - if self.knowledge_search_query: - # Quering agent specific knowledge - if self.knowledge: - agent_knowledge_snippets = self.knowledge.query( - [self.knowledge_search_query], **knowledge_config - ) - if agent_knowledge_snippets: - self.agent_knowledge_context = extract_knowledge_context( - agent_knowledge_snippets - ) - if self.agent_knowledge_context: - task_prompt += self.agent_knowledge_context + prepare_tools(self, tools, task) + task_prompt = apply_training_data(self, task_prompt) - # Quering crew specific knowledge - knowledge_snippets = self.crew.query_knowledge( - [self.knowledge_search_query], **knowledge_config - ) - if knowledge_snippets: - self.crew_knowledge_context = extract_knowledge_context( - knowledge_snippets - ) - if self.crew_knowledge_context: - task_prompt += self.crew_knowledge_context - - crewai_event_bus.emit( - self, - event=KnowledgeRetrievalCompletedEvent( - query=self.knowledge_search_query, - from_task=task, - from_agent=self, - retrieved_knowledge=( - (self.agent_knowledge_context or "") - + ( - "\n" - if self.agent_knowledge_context - and self.crew_knowledge_context - else "" - ) - + (self.crew_knowledge_context or "") - ), - ), - ) - except Exception as e: - crewai_event_bus.emit( - self, - event=KnowledgeSearchQueryFailedEvent( - query=self.knowledge_search_query or "", - error=str(e), - from_task=task, - from_agent=self, - ), - ) - - tools = tools or self.tools or [] - self.create_agent_executor(tools=tools, task=task) - - if self.crew and self.crew._train: - task_prompt = self._training_handler(task_prompt=task_prompt) - else: - task_prompt = self._use_trained_data(task_prompt=task_prompt) - - # Import agent events locally to avoid circular imports from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, AgentExecutionErrorEvent, @@ -474,15 +380,8 @@ class Agent(BaseAgent): ), ) - # Determine execution method based on timeout setting + validate_max_execution_time(self.max_execution_time) if self.max_execution_time is not None: - if ( - not isinstance(self.max_execution_time, int) - or self.max_execution_time <= 0 - ): - raise ValueError( - "Max Execution time must be a positive integer greater than zero" - ) result = self._execute_with_timeout( task_prompt, task, self.max_execution_time ) @@ -490,7 +389,6 @@ class Agent(BaseAgent): result = self._execute_without_timeout(task_prompt, task) except TimeoutError as e: - # Propagate TimeoutError without retry crewai_event_bus.emit( self, event=AgentExecutionErrorEvent( @@ -502,7 +400,6 @@ class Agent(BaseAgent): raise e except Exception as e: if e.__class__.__module__.startswith("litellm"): - # Do not retry on litellm errors crewai_event_bus.emit( self, event=AgentExecutionErrorEvent( @@ -528,23 +425,13 @@ class Agent(BaseAgent): if self.max_rpm and self._rpm_controller: self._rpm_controller.stop_rpm_counter() - # If there was any tool in self.tools_results that had result_as_answer - # set to True, return the results of the last tool that had - # result_as_answer set to True - for tool_result in self.tools_results: - if tool_result.get("result_as_answer", False): - result = tool_result["result"] + result = process_tool_results(self, result) crewai_event_bus.emit( self, event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), ) - self._last_messages = ( - self.agent_executor.messages.copy() - if self.agent_executor and hasattr(self.agent_executor, "messages") - else [] - ) - + save_last_messages(self) self._cleanup_mcp_clients() return result @@ -625,47 +512,15 @@ class Agent(BaseAgent): ValueError: If the max execution time is not a positive integer. RuntimeError: If the agent execution fails for other reasons. """ - if self.reasoning: - try: - from crewai.utilities.reasoning_handler import ( - AgentReasoning, - AgentReasoningOutput, - ) - - reasoning_handler = AgentReasoning(task=task, agent=self) - reasoning_output: AgentReasoningOutput = ( - reasoning_handler.handle_agent_reasoning() - ) - - task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}" - except Exception as e: - self._logger.log("error", f"Error during reasoning process: {e!s}") + handle_reasoning(self, task) self._inject_date_to_task(task) if self.tools_handler: self.tools_handler.last_used_tool = None task_prompt = task.prompt() - - if (task.output_json or task.output_pydantic) and not task.response_model: - if task.output_json: - schema_dict = generate_model_description(task.output_json) - schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2) - task_prompt += "\n" + self.i18n.slice( - "formatted_task_instructions" - ).format(output_format=schema) - - elif task.output_pydantic: - schema_dict = generate_model_description(task.output_pydantic) - schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2) - task_prompt += "\n" + self.i18n.slice( - "formatted_task_instructions" - ).format(output_format=schema) - - if context: - task_prompt = self.i18n.slice("task_with_context").format( - task=task_prompt, context=context - ) + task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n) + task_prompt = format_task_with_context(task_prompt, context, self.i18n) if self._is_any_available_memory(): crewai_event_bus.emit( @@ -705,80 +560,14 @@ class Agent(BaseAgent): from_task=task, ), ) - knowledge_config = ( - self.knowledge_config.model_dump() if self.knowledge_config else {} + + knowledge_config = get_knowledge_config(self) + task_prompt = await ahandle_knowledge_retrieval( + self, task, task_prompt, knowledge_config ) - if self.knowledge or (self.crew and self.crew.knowledge): - crewai_event_bus.emit( - self, - event=KnowledgeRetrievalStartedEvent( - from_task=task, - from_agent=self, - ), - ) - try: - self.knowledge_search_query = self._get_knowledge_search_query( - task_prompt, task - ) - if self.knowledge_search_query: - if self.knowledge: - agent_knowledge_snippets = await self.knowledge.aquery( - [self.knowledge_search_query], **knowledge_config - ) - if agent_knowledge_snippets: - self.agent_knowledge_context = extract_knowledge_context( - agent_knowledge_snippets - ) - if self.agent_knowledge_context: - task_prompt += self.agent_knowledge_context - - knowledge_snippets = await self.crew.aquery_knowledge( - [self.knowledge_search_query], **knowledge_config - ) - if knowledge_snippets: - self.crew_knowledge_context = extract_knowledge_context( - knowledge_snippets - ) - if self.crew_knowledge_context: - task_prompt += self.crew_knowledge_context - - crewai_event_bus.emit( - self, - event=KnowledgeRetrievalCompletedEvent( - query=self.knowledge_search_query, - from_task=task, - from_agent=self, - retrieved_knowledge=( - (self.agent_knowledge_context or "") - + ( - "\n" - if self.agent_knowledge_context - and self.crew_knowledge_context - else "" - ) - + (self.crew_knowledge_context or "") - ), - ), - ) - except Exception as e: - crewai_event_bus.emit( - self, - event=KnowledgeSearchQueryFailedEvent( - query=self.knowledge_search_query or "", - error=str(e), - from_task=task, - from_agent=self, - ), - ) - - tools = tools or self.tools or [] - self.create_agent_executor(tools=tools, task=task) - - if self.crew and self.crew._train: - task_prompt = self._training_handler(task_prompt=task_prompt) - else: - task_prompt = self._use_trained_data(task_prompt=task_prompt) + prepare_tools(self, tools, task) + task_prompt = apply_training_data(self, task_prompt) from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, @@ -797,14 +586,8 @@ class Agent(BaseAgent): ), ) + validate_max_execution_time(self.max_execution_time) if self.max_execution_time is not None: - if ( - not isinstance(self.max_execution_time, int) - or self.max_execution_time <= 0 - ): - raise ValueError( - "Max Execution time must be a positive integer greater than zero" - ) result = await self._aexecute_with_timeout( task_prompt, task, self.max_execution_time ) @@ -848,20 +631,13 @@ class Agent(BaseAgent): if self.max_rpm and self._rpm_controller: self._rpm_controller.stop_rpm_counter() - for tool_result in self.tools_results: - if tool_result.get("result_as_answer", False): - result = tool_result["result"] + result = process_tool_results(self, result) crewai_event_bus.emit( self, event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), ) - self._last_messages = ( - self.agent_executor.messages.copy() - if self.agent_executor and hasattr(self.agent_executor, "messages") - else [] - ) - + save_last_messages(self) self._cleanup_mcp_clients() return result diff --git a/lib/crewai/src/crewai/agent/utils.py b/lib/crewai/src/crewai/agent/utils.py new file mode 100644 index 000000000..0aea029e9 --- /dev/null +++ b/lib/crewai/src/crewai/agent/utils.py @@ -0,0 +1,355 @@ +"""Utility functions for agent task execution. + +This module contains shared logic extracted from the Agent's execute_task +and aexecute_task methods to reduce code duplication. +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.knowledge_events import ( + KnowledgeRetrievalCompletedEvent, + KnowledgeRetrievalStartedEvent, + KnowledgeSearchQueryFailedEvent, +) +from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context +from crewai.utilities.converter import generate_model_description + + +if TYPE_CHECKING: + from crewai.agent.core import Agent + from crewai.task import Task + from crewai.tools.base_tool import BaseTool + from crewai.utilities.i18n import I18N + + +def handle_reasoning(agent: Agent, task: Task) -> None: + """Handle the reasoning process for an agent before task execution. + + Args: + agent: The agent performing the task. + task: The task to execute. + """ + if not agent.reasoning: + return + + try: + from crewai.utilities.reasoning_handler import ( + AgentReasoning, + AgentReasoningOutput, + ) + + reasoning_handler = AgentReasoning(task=task, agent=agent) + reasoning_output: AgentReasoningOutput = ( + reasoning_handler.handle_agent_reasoning() + ) + task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}" + except Exception as e: + agent._logger.log("error", f"Error during reasoning process: {e!s}") + + +def build_task_prompt_with_schema(task: Task, task_prompt: str, i18n: I18N) -> str: + """Build task prompt with JSON/Pydantic schema instructions if applicable. + + Args: + task: The task being executed. + task_prompt: The initial task prompt. + i18n: Internationalization instance. + + Returns: + The task prompt potentially augmented with schema instructions. + """ + if (task.output_json or task.output_pydantic) and not task.response_model: + if task.output_json: + schema_dict = generate_model_description(task.output_json) + schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2) + task_prompt += "\n" + i18n.slice("formatted_task_instructions").format( + output_format=schema + ) + elif task.output_pydantic: + schema_dict = generate_model_description(task.output_pydantic) + schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2) + task_prompt += "\n" + i18n.slice("formatted_task_instructions").format( + output_format=schema + ) + return task_prompt + + +def format_task_with_context(task_prompt: str, context: str | None, i18n: I18N) -> str: + """Format task prompt with context if provided. + + Args: + task_prompt: The task prompt. + context: Optional context string. + i18n: Internationalization instance. + + Returns: + The task prompt formatted with context if provided. + """ + if context: + return i18n.slice("task_with_context").format(task=task_prompt, context=context) + return task_prompt + + +def get_knowledge_config(agent: Agent) -> dict[str, Any]: + """Get knowledge configuration from agent. + + Args: + agent: The agent instance. + + Returns: + Dictionary of knowledge configuration. + """ + return agent.knowledge_config.model_dump() if agent.knowledge_config else {} + + +def handle_knowledge_retrieval( + agent: Agent, + task: Task, + task_prompt: str, + knowledge_config: dict[str, Any], + query_func: Any, + crew_query_func: Any, +) -> str: + """Handle knowledge retrieval for task execution. + + This function handles both agent-specific and crew-specific knowledge queries. + + Args: + agent: The agent performing the task. + task: The task being executed. + task_prompt: The current task prompt. + knowledge_config: Knowledge configuration dictionary. + query_func: Function to query agent knowledge (sync or async). + crew_query_func: Function to query crew knowledge (sync or async). + + Returns: + The task prompt potentially augmented with knowledge context. + """ + if not (agent.knowledge or (agent.crew and agent.crew.knowledge)): + return task_prompt + + crewai_event_bus.emit( + agent, + event=KnowledgeRetrievalStartedEvent( + from_task=task, + from_agent=agent, + ), + ) + try: + agent.knowledge_search_query = agent._get_knowledge_search_query( + task_prompt, task + ) + if agent.knowledge_search_query: + if agent.knowledge: + agent_knowledge_snippets = query_func( + [agent.knowledge_search_query], **knowledge_config + ) + if agent_knowledge_snippets: + agent.agent_knowledge_context = extract_knowledge_context( + agent_knowledge_snippets + ) + if agent.agent_knowledge_context: + task_prompt += agent.agent_knowledge_context + + knowledge_snippets = crew_query_func( + [agent.knowledge_search_query], **knowledge_config + ) + if knowledge_snippets: + agent.crew_knowledge_context = extract_knowledge_context( + knowledge_snippets + ) + if agent.crew_knowledge_context: + task_prompt += agent.crew_knowledge_context + + crewai_event_bus.emit( + agent, + event=KnowledgeRetrievalCompletedEvent( + query=agent.knowledge_search_query, + from_task=task, + from_agent=agent, + retrieved_knowledge=_combine_knowledge_context(agent), + ), + ) + except Exception as e: + crewai_event_bus.emit( + agent, + event=KnowledgeSearchQueryFailedEvent( + query=agent.knowledge_search_query or "", + error=str(e), + from_task=task, + from_agent=agent, + ), + ) + return task_prompt + + +def _combine_knowledge_context(agent: Agent) -> str: + """Combine agent and crew knowledge contexts into a single string. + + Args: + agent: The agent with knowledge contexts. + + Returns: + Combined knowledge context string. + """ + agent_ctx = agent.agent_knowledge_context or "" + crew_ctx = agent.crew_knowledge_context or "" + separator = "\n" if agent_ctx and crew_ctx else "" + return agent_ctx + separator + crew_ctx + + +def apply_training_data(agent: Agent, task_prompt: str) -> str: + """Apply training data to the task prompt. + + Args: + agent: The agent performing the task. + task_prompt: The task prompt. + + Returns: + The task prompt with training data applied. + """ + if agent.crew and agent.crew._train: + return agent._training_handler(task_prompt=task_prompt) + return agent._use_trained_data(task_prompt=task_prompt) + + +def process_tool_results(agent: Agent, result: Any) -> Any: + """Process tool results, returning result_as_answer if applicable. + + Args: + agent: The agent with tool results. + result: The current result. + + Returns: + The final result, potentially overridden by tool result_as_answer. + """ + for tool_result in agent.tools_results: + if tool_result.get("result_as_answer", False): + result = tool_result["result"] + return result + + +def save_last_messages(agent: Agent) -> None: + """Save the last messages from agent executor. + + Args: + agent: The agent instance. + """ + agent._last_messages = ( + agent.agent_executor.messages.copy() + if agent.agent_executor and hasattr(agent.agent_executor, "messages") + else [] + ) + + +def prepare_tools( + agent: Agent, tools: list[BaseTool] | None, task: Task +) -> list[BaseTool]: + """Prepare tools for task execution and create agent executor. + + Args: + agent: The agent instance. + tools: Optional list of tools. + task: The task being executed. + + Returns: + The list of tools to use. + """ + final_tools = tools or agent.tools or [] + agent.create_agent_executor(tools=final_tools, task=task) + return final_tools + + +def validate_max_execution_time(max_execution_time: int | None) -> None: + """Validate max_execution_time parameter. + + Args: + max_execution_time: The maximum execution time to validate. + + Raises: + ValueError: If max_execution_time is not a positive integer. + """ + if max_execution_time is not None: + if not isinstance(max_execution_time, int) or max_execution_time <= 0: + raise ValueError( + "Max Execution time must be a positive integer greater than zero" + ) + + +async def ahandle_knowledge_retrieval( + agent: Agent, + task: Task, + task_prompt: str, + knowledge_config: dict[str, Any], +) -> str: + """Handle async knowledge retrieval for task execution. + + Args: + agent: The agent performing the task. + task: The task being executed. + task_prompt: The current task prompt. + knowledge_config: Knowledge configuration dictionary. + + Returns: + The task prompt potentially augmented with knowledge context. + """ + if not (agent.knowledge or (agent.crew and agent.crew.knowledge)): + return task_prompt + + crewai_event_bus.emit( + agent, + event=KnowledgeRetrievalStartedEvent( + from_task=task, + from_agent=agent, + ), + ) + try: + agent.knowledge_search_query = agent._get_knowledge_search_query( + task_prompt, task + ) + if agent.knowledge_search_query: + if agent.knowledge: + agent_knowledge_snippets = await agent.knowledge.aquery( + [agent.knowledge_search_query], **knowledge_config + ) + if agent_knowledge_snippets: + agent.agent_knowledge_context = extract_knowledge_context( + agent_knowledge_snippets + ) + if agent.agent_knowledge_context: + task_prompt += agent.agent_knowledge_context + + knowledge_snippets = await agent.crew.aquery_knowledge( + [agent.knowledge_search_query], **knowledge_config + ) + if knowledge_snippets: + agent.crew_knowledge_context = extract_knowledge_context( + knowledge_snippets + ) + if agent.crew_knowledge_context: + task_prompt += agent.crew_knowledge_context + + crewai_event_bus.emit( + agent, + event=KnowledgeRetrievalCompletedEvent( + query=agent.knowledge_search_query, + from_task=task, + from_agent=agent, + retrieved_knowledge=_combine_knowledge_context(agent), + ), + ) + except Exception as e: + crewai_event_bus.emit( + agent, + event=KnowledgeSearchQueryFailedEvent( + query=agent.knowledge_search_query or "", + error=str(e), + from_task=task, + from_agent=agent, + ), + ) + return task_prompt