diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 868c14344..21b586cd7 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -1,8 +1,13 @@ +"""Core agent implementation for the CrewAI framework.""" + from __future__ import annotations import asyncio from collections.abc import Callable, Coroutine, Sequence +import concurrent.futures import contextvars +from datetime import datetime +import json from pathlib import Path import shutil import subprocess @@ -11,8 +16,10 @@ from typing import ( TYPE_CHECKING, Any, Literal, + NoReturn, cast, ) +import warnings from pydantic import ( BaseModel, @@ -44,6 +51,9 @@ from crewai.agents.cache.cache_handler import CacheHandler from crewai.agents.crew_agent_executor import CrewAgentExecutor from crewai.events.event_bus import crewai_event_bus from crewai.events.types.agent_events import ( + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, LiteAgentExecutionCompletedEvent, LiteAgentExecutionErrorEvent, LiteAgentExecutionStartedEvent, @@ -58,6 +68,7 @@ from crewai.events.types.memory_events import ( MemoryRetrievalFailedEvent, MemoryRetrievalStartedEvent, ) +from crewai.events.types.skill_events import SkillActivatedEvent from crewai.experimental.agent_executor import AgentExecutor from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource @@ -82,7 +93,7 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F from crewai.utilities.converter import Converter, ConverterError from crewai.utilities.env import get_env_context from crewai.utilities.guardrail import process_guardrail -from crewai.utilities.guardrail_types import GuardrailType +from crewai.utilities.guardrail_types import GuardrailCallable, GuardrailType from crewai.utilities.llm_utils import create_llm from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult from crewai.utilities.pydantic_schema_utils import generate_model_description @@ -263,13 +274,16 @@ class Agent(BaseAgent): ) @model_validator(mode="before") - def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: # noqa: N805 + @classmethod + def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: + """Merge repository agent config with provided values before validation.""" if v is not None and (from_repository := v.get("from_repository")): return load_agent_from_repository(from_repository) | v return v @model_validator(mode="after") def post_init_setup(self) -> Self: + """Initialize LLM, executor, code tools, and skills after model creation.""" self.llm = create_llm(self.llm) if self.function_calling_llm and not isinstance( self.function_calling_llm, BaseLLM @@ -284,10 +298,7 @@ class Agent(BaseAgent): self.set_skills() - # Handle backward compatibility: convert reasoning=True to planning_config if self.reasoning and self.planning_config is None: - import warnings - warnings.warn( "The 'reasoning' parameter is deprecated. Use 'planning_config=PlanningConfig()' instead.", DeprecationWarning, @@ -305,11 +316,13 @@ class Agent(BaseAgent): return self.planning_config is not None or self.planning def _setup_agent_executor(self) -> None: + """Initialize the agent executor with a default cache handler.""" if not self.cache_handler: self.cache_handler = CacheHandler() self.set_cache_handler(self.cache_handler) def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None: + """Initialize knowledge sources with the agent or crew embedder config.""" try: if self.embedder is None and crew_embedder: self.embedder = crew_embedder @@ -342,8 +355,6 @@ class Agent(BaseAgent): and activated). When provided, avoids redundant discovery per agent. """ from crewai.crew import Crew - from crewai.events.event_bus import crewai_event_bus - from crewai.events.types.skill_events import SkillActivatedEvent if resolved_crew_skills is None: crew_skills: list[Path | SkillModel] | None = ( @@ -421,6 +432,235 @@ class Agent(BaseAgent): and len(tools) > 0 ) + def _prepare_task_execution( + self, + task: Task, + context: str | None, + ) -> str: + """Prepare common setup for task execution shared by sync and async paths. + + Handles reasoning, date injection, prompt building, and memory retrieval. + + Args: + task: Task to execute. + context: Context to execute the task in. + + Returns: + The task prompt after memory retrieval, ready for knowledge lookup. + """ + get_env_context() + if self.executor_class is not AgentExecutor: + handle_reasoning(self, task) + + self._inject_date_to_task(task) + + if self.tools_handler: + self.tools_handler.last_used_tool = None + + task_prompt = task.prompt() + task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n) + task_prompt = format_task_with_context(task_prompt, context, self.i18n) + return self._retrieve_memory_context(task, task_prompt) + + def _finalize_task_prompt( + self, + task_prompt: str, + tools: list[BaseTool] | None, + task: Task, + ) -> str: + """Apply skill context, tool preparation, and training data to the task prompt. + + Args: + task_prompt: The task prompt after memory and knowledge retrieval. + tools: Tools to use for the task. + task: Task to execute. + + Returns: + The fully prepared task prompt. + """ + task_prompt = append_skill_context(self, task_prompt) + prepare_tools(self, tools, task) + + return apply_training_data(self, task_prompt) + + def _retrieve_memory_context(self, task: Task, task_prompt: str) -> str: + """Retrieve memory context and append it to the task prompt. + + Args: + task: The task being executed. + task_prompt: The current task prompt. + + Returns: + The task prompt, potentially augmented with memory context. + """ + if not self._is_any_available_memory(): + return task_prompt + + crewai_event_bus.emit( + self, + event=MemoryRetrievalStartedEvent( + task_id=str(task.id) if task else None, + source_type="agent", + from_agent=self, + from_task=task, + ), + ) + + start_time = time.time() + memory = "" + + try: + unified_memory = getattr(self, "memory", None) or ( + getattr(self.crew, "_memory", None) if self.crew else None + ) + if unified_memory is not None: + query = task.description + matches = unified_memory.recall(query, limit=5) + if matches: + memory = "Relevant memories:\n" + "\n".join( + m.format() for m in matches + ) + if memory.strip() != "": + task_prompt += self.i18n.slice("memory").format(memory=memory) + + crewai_event_bus.emit( + self, + event=MemoryRetrievalCompletedEvent( + task_id=str(task.id) if task else None, + memory_content=memory, + retrieval_time_ms=(time.time() - start_time) * 1000, + source_type="agent", + from_agent=self, + from_task=task, + ), + ) + except Exception as e: + crewai_event_bus.emit( + self, + event=MemoryRetrievalFailedEvent( + task_id=str(task.id) if task else None, + source_type="agent", + from_agent=self, + from_task=task, + error=str(e), + ), + ) + + return task_prompt + + def _finalize_task_execution(self, task: Task, result: Any) -> Any: + """Finalize task execution with RPM cleanup, tool processing, and event emission. + + Args: + task: The task that was executed. + result: The raw execution result. + + Returns: + The processed result. + """ + if self.max_rpm and self._rpm_controller: + self._rpm_controller.stop_rpm_counter() + + result = process_tool_results(self, result) + + output_for_event = result + if ( + AgentResponseProtocol is not None + and isinstance(result, BaseModel) + and isinstance(result, AgentResponseProtocol) + ): + output_for_event = str(result.message) + elif not isinstance(result, str): + output_for_event = str(result) + + crewai_event_bus.emit( + self, + event=AgentExecutionCompletedEvent( + agent=self, task=task, output=output_for_event + ), + ) + + save_last_messages(self) + self._cleanup_mcp_clients() + + return result + + def _check_execution_error(self, e: Exception, task: Task) -> None: + """Check if an execution error should be re-raised immediately. + + Args: + e: The exception that occurred. + task: The task being executed. + + Raises: + Exception: If the error is from litellm, a passthrough, or retries are exhausted. + """ + if e.__class__.__module__.startswith("litellm"): + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + if isinstance(e, _passthrough_exceptions): + raise + self._times_executed += 1 + if self._times_executed > self.max_retry_limit: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + + def _handle_execution_error( + self, + e: Exception, + task: Task, + context: str | None, + tools: list[BaseTool] | None, + ) -> Any: + """Handle execution errors with retry logic (sync path). + + Args: + e: The exception that occurred. + task: The task being executed. + context: Task context. + tools: Task tools. + + Returns: + Result from retried execution. + """ + self._check_execution_error(e, task) + return self.execute_task(task, context, tools) + + async def _handle_execution_error_async( + self, + e: Exception, + task: Task, + context: str | None, + tools: list[BaseTool] | None, + ) -> Any: + """Handle execution errors with retry logic (async path). + + Args: + e: The exception that occurred. + task: The task being executed. + context: Task context. + tools: Task tools. + + Returns: + Result from retried execution. + """ + self._check_execution_error(e, task) + return await self.aexecute_task(task, context, tools) + def execute_task( self, task: Task, @@ -442,71 +682,7 @@ class Agent(BaseAgent): ValueError: If the max execution time is not a positive integer. RuntimeError: If the agent execution fails for other reasons. """ - get_env_context() - # Only call handle_reasoning for legacy CrewAgentExecutor - # For AgentExecutor, planning is handled in AgentExecutor.generate_plan() - if self.executor_class is not AgentExecutor: - handle_reasoning(self, task) - - self._inject_date_to_task(task) - - if self.tools_handler: - self.tools_handler.last_used_tool = None - - task_prompt = task.prompt() - task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n) - task_prompt = format_task_with_context(task_prompt, context, self.i18n) - - if self._is_any_available_memory(): - crewai_event_bus.emit( - self, - event=MemoryRetrievalStartedEvent( - task_id=str(task.id) if task else None, - source_type="agent", - from_agent=self, - from_task=task, - ), - ) - - start_time = time.time() - memory = "" - - try: - unified_memory = getattr(self, "memory", None) or ( - getattr(self.crew, "_memory", None) if self.crew else None - ) - if unified_memory is not None: - query = task.description - matches = unified_memory.recall(query, limit=5) - if matches: - memory = "Relevant memories:\n" + "\n".join( - m.format() for m in matches - ) - if memory.strip() != "": - task_prompt += self.i18n.slice("memory").format(memory=memory) - - crewai_event_bus.emit( - self, - event=MemoryRetrievalCompletedEvent( - task_id=str(task.id) if task else None, - memory_content=memory, - retrieval_time_ms=(time.time() - start_time) * 1000, - source_type="agent", - from_agent=self, - from_task=task, - ), - ) - except Exception as e: - crewai_event_bus.emit( - self, - event=MemoryRetrievalFailedEvent( - task_id=str(task.id) if task else None, - source_type="agent", - from_agent=self, - from_task=task, - error=str(e), - ), - ) + task_prompt = self._prepare_task_execution(task, context) knowledge_config = get_knowledge_config(self) task_prompt = handle_knowledge_retrieval( @@ -518,16 +694,7 @@ class Agent(BaseAgent): self.crew.query_knowledge if self.crew else lambda *a, **k: None, ) - task_prompt = append_skill_context(self, task_prompt) - - prepare_tools(self, tools, task) - task_prompt = apply_training_data(self, task_prompt) - - from crewai.events.types.agent_events import ( - AgentExecutionCompletedEvent, - AgentExecutionErrorEvent, - AgentExecutionStartedEvent, - ) + task_prompt = self._finalize_task_prompt(task_prompt, tools, task) try: crewai_event_bus.emit( @@ -559,57 +726,9 @@ class Agent(BaseAgent): ) raise e except Exception as e: - if e.__class__.__module__.startswith("litellm"): - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - if isinstance(e, _passthrough_exceptions): - raise - self._times_executed += 1 - if self._times_executed > self.max_retry_limit: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - result = self.execute_task(task, context, tools) + result = self._handle_execution_error(e, task, context, tools) - if self.max_rpm and self._rpm_controller: - self._rpm_controller.stop_rpm_counter() - - result = process_tool_results(self, result) - - output_for_event = result - if ( - AgentResponseProtocol is not None - and isinstance(result, BaseModel) - and isinstance(result, AgentResponseProtocol) - ): - output_for_event = str(result.message) - elif not isinstance(result, str): - output_for_event = str(result) - - crewai_event_bus.emit( - self, - event=AgentExecutionCompletedEvent( - agent=self, task=task, output=output_for_event - ), - ) - - save_last_messages(self) - self._cleanup_mcp_clients() - - return result + return self._finalize_task_execution(task, result) def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any: """Execute a task with a timeout. @@ -626,8 +745,6 @@ class Agent(BaseAgent): TimeoutError: If execution exceeds the timeout. RuntimeError: If execution fails for other reasons. """ - import concurrent.futures - ctx = contextvars.copy_context() with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit( @@ -691,85 +808,14 @@ class Agent(BaseAgent): ValueError: If the max execution time is not a positive integer. RuntimeError: If the agent execution fails for other reasons. """ - if self.executor_class is not AgentExecutor: - handle_reasoning( - self, task - ) # we need this till CrewAgentExecutor migrates to AgentExecutor - self._inject_date_to_task(task) - - if self.tools_handler: - self.tools_handler.last_used_tool = None - - task_prompt = task.prompt() - task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n) - task_prompt = format_task_with_context(task_prompt, context, self.i18n) - - if self._is_any_available_memory(): - crewai_event_bus.emit( - self, - event=MemoryRetrievalStartedEvent( - task_id=str(task.id) if task else None, - source_type="agent", - from_agent=self, - from_task=task, - ), - ) - - start_time = time.time() - memory = "" - - try: - unified_memory = getattr(self, "memory", None) or ( - getattr(self.crew, "_memory", None) if self.crew else None - ) - if unified_memory is not None: - query = task.description - matches = unified_memory.recall(query, limit=5) - if matches: - memory = "Relevant memories:\n" + "\n".join( - m.format() for m in matches - ) - if memory.strip() != "": - task_prompt += self.i18n.slice("memory").format(memory=memory) - - crewai_event_bus.emit( - self, - event=MemoryRetrievalCompletedEvent( - task_id=str(task.id) if task else None, - memory_content=memory, - retrieval_time_ms=(time.time() - start_time) * 1000, - source_type="agent", - from_agent=self, - from_task=task, - ), - ) - except Exception as e: - crewai_event_bus.emit( - self, - event=MemoryRetrievalFailedEvent( - task_id=str(task.id) if task else None, - source_type="agent", - from_agent=self, - from_task=task, - error=str(e), - ), - ) + task_prompt = self._prepare_task_execution(task, context) knowledge_config = get_knowledge_config(self) task_prompt = await ahandle_knowledge_retrieval( self, task, task_prompt, knowledge_config ) - task_prompt = append_skill_context(self, task_prompt) - - prepare_tools(self, tools, task) - task_prompt = apply_training_data(self, task_prompt) - - from crewai.events.types.agent_events import ( - AgentExecutionCompletedEvent, - AgentExecutionErrorEvent, - AgentExecutionStartedEvent, - ) + task_prompt = self._finalize_task_prompt(task_prompt, tools, task) try: crewai_event_bus.emit( @@ -801,57 +847,9 @@ class Agent(BaseAgent): ) raise e except Exception as e: - if e.__class__.__module__.startswith("litellm"): - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - if isinstance(e, _passthrough_exceptions): - raise - self._times_executed += 1 - if self._times_executed > self.max_retry_limit: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - result = await self.aexecute_task(task, context, tools) + result = await self._handle_execution_error_async(e, task, context, tools) - if self.max_rpm and self._rpm_controller: - self._rpm_controller.stop_rpm_counter() - - result = process_tool_results(self, result) - - output_for_event = result - if ( - AgentResponseProtocol is not None - and isinstance(result, BaseModel) - and isinstance(result, AgentResponseProtocol) - ): - output_for_event = str(result.message) - elif not isinstance(result, str): - output_for_event = str(result) - - crewai_event_bus.emit( - self, - event=AgentExecutionCompletedEvent( - agent=self, task=task, output=output_for_event - ), - ) - - save_last_messages(self) - self._cleanup_mcp_clients() - - return result + return self._finalize_task_execution(task, result) async def _aexecute_with_timeout( self, task_prompt: str, task: Task, timeout: int @@ -904,17 +902,19 @@ class Agent(BaseAgent): ) return result["output"] - def create_agent_executor( - self, tools: list[BaseTool] | None = None, task: Task | None = None - ) -> None: - """Create an agent executor for the agent. + def _build_execution_prompt( + self, raw_tools: list[BaseTool] + ) -> tuple[ + SystemPromptResult | StandardPromptResult, list[str], Callable[[], bool] | None + ]: + """Build the execution prompt, stop words, and RPM limit function. + + Args: + raw_tools: The raw tools available to the agent. Returns: - An instance of the CrewAgentExecutor class. + A tuple of (prompt, stop_words, rpm_limit_fn). """ - raw_tools: list[BaseTool] = tools or self.tools or [] - parsed_tools = parse_tools(raw_tools) - use_native_tool_calling = self._supports_native_tool_calling(raw_tools) prompt = Prompts( @@ -929,7 +929,6 @@ class Agent(BaseAgent): ).task_execution() stop_words = [self.i18n.slice("observation")] - if self.response_template: stop_words.append( self.response_template.split("{{ .Response }}")[1].strip() @@ -939,6 +938,21 @@ class Agent(BaseAgent): self._rpm_controller.check_or_wait if self._rpm_controller else None ) + return prompt, stop_words, rpm_limit_fn + + def create_agent_executor( + self, tools: list[BaseTool] | None = None, task: Task | None = None + ) -> None: + """Create an agent executor for the agent. + + Returns: + An instance of the CrewAgentExecutor class. + """ + raw_tools: list[BaseTool] = tools or self.tools or [] + parsed_tools = parse_tools(raw_tools) + + prompt, stop_words, rpm_limit_fn = self._build_execution_prompt(raw_tools) + if self.agent_executor is not None: self._update_executor_parameters( task=task, @@ -1052,17 +1066,18 @@ class Agent(BaseAgent): @staticmethod def get_multimodal_tools() -> Sequence[BaseTool]: + """Return tools for multimodal agent capabilities.""" from crewai.tools.agent_tools.add_image_tool import AddImageTool return [AddImageTool()] def get_code_execution_tools(self) -> list[CodeInterpreterTool]: + """Return code interpreter tools based on the agent's execution mode.""" try: from crewai_tools import ( CodeInterpreterTool, ) - # Set the unsafe_mode based on the code_execution_mode attribute unsafe_mode = self.code_execution_mode == "unsafe" return [CodeInterpreterTool(unsafe_mode=unsafe_mode)] except ModuleNotFoundError: @@ -1075,6 +1090,7 @@ class Agent(BaseAgent): def get_output_converter( llm: BaseLLM, text: str, model: type[BaseModel], instructions: str ) -> Converter: + """Create a Converter instance for transforming LLM output to a structured model.""" return Converter(llm=llm, text=text, model=model, instructions=instructions) def _training_handler(self, task_prompt: str) -> str: @@ -1124,8 +1140,6 @@ class Agent(BaseAgent): def _inject_date_to_task(self, task: Task) -> None: """Inject the current date into the task description if inject_date is enabled.""" if self.inject_date: - from datetime import datetime - try: valid_format_codes = [ "%Y", @@ -1159,7 +1173,7 @@ class Agent(BaseAgent): try: subprocess.run( # noqa: S603 - [docker_path, "info"], + [str(docker_path), "info"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -1187,6 +1201,7 @@ class Agent(BaseAgent): return self.security_config.fingerprint def set_fingerprint(self, fingerprint: Fingerprint) -> None: + """Set the agent's security fingerprint.""" self.security_config.fingerprint = fingerprint @property @@ -1228,15 +1243,11 @@ class Agent(BaseAgent): return None try: - rewritten_query = self.llm.call( - [ - { - "role": "system", - "content": rewriter_prompt, - }, - {"role": "user", "content": query}, - ] - ) + messages: list[LLMMessage] = [ + {"role": "system", "content": rewriter_prompt}, + {"role": "user", "content": query}, + ] + rewritten_query = self.llm.call(messages) crewai_event_bus.emit( self, event=KnowledgeQueryCompletedEvent( @@ -1277,7 +1288,6 @@ class Agent(BaseAgent): Returns: Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution. """ - # Process platform apps and MCP tools if self.apps: platform_tools = self.get_platform_tools(self.apps) if platform_tools: @@ -1291,7 +1301,6 @@ class Agent(BaseAgent): self.tools = [] self.tools.extend(mcps) - # Prepare tools raw_tools: list[BaseTool] = self.tools or [] # Inject memory tools for standalone kickoff (crew path handles its own) @@ -1308,7 +1317,6 @@ class Agent(BaseAgent): parsed_tools = parse_tools(raw_tools) - # Build agent_info for backward-compatible event emission agent_info = { "id": self.id, "role": self.role, @@ -1318,35 +1326,9 @@ class Agent(BaseAgent): "verbose": self.verbose, } - # Build prompt for standalone execution - use_native_tool_calling = self._supports_native_tool_calling(raw_tools) - prompt = Prompts( - agent=self, - has_tools=len(raw_tools) > 0, - use_native_tool_calling=use_native_tool_calling, - i18n=self.i18n, - use_system_prompt=self.use_system_prompt, - system_template=self.system_template, - prompt_template=self.prompt_template, - response_template=self.response_template, - ).task_execution() + prompt, stop_words, rpm_limit_fn = self._build_execution_prompt(raw_tools) - # Prepare stop words - stop_words = [self.i18n.slice("observation")] - if self.response_template: - stop_words.append( - self.response_template.split("{{ .Response }}")[1].strip() - ) - - # Get RPM limit function - rpm_limit_fn = ( - self._rpm_controller.check_or_wait if self._rpm_controller else None - ) - - # Create the executor for standalone mode (no crew, no task) executor = AgentExecutor( - task=None, - crew=None, llm=cast(BaseLLM, self.llm), agent=self, prompt=prompt, @@ -1425,7 +1407,6 @@ class Agent(BaseAgent): formatted_messages = append_skill_context(self, formatted_messages) - # Build the input dict for the executor inputs: dict[str, Any] = { "input": formatted_messages, "tool_names": get_tool_names(parsed_tools), @@ -1487,36 +1468,65 @@ class Agent(BaseAgent): ) output = self._execute_and_build_output(executor, inputs, response_format) - if self.guardrail is not None: - output = self._process_kickoff_guardrail( - output=output, - executor=executor, - inputs=inputs, - response_format=response_format, - ) - - # Save to memory after execution (passive save) - self._save_kickoff_to_memory(messages, output.raw) - - crewai_event_bus.emit( - self, - event=LiteAgentExecutionCompletedEvent( - agent_info=agent_info, - output=output.raw, - ), + return self._finalize_kickoff( + output, executor, inputs, response_format, messages, agent_info ) - return output - except Exception as e: - crewai_event_bus.emit( - self, - event=LiteAgentExecutionErrorEvent( - agent_info=agent_info, - error=str(e), - ), + self._emit_kickoff_error(agent_info, e) + + def _finalize_kickoff( + self, + output: LiteAgentOutput, + executor: AgentExecutor, + inputs: dict[str, str], + response_format: type[Any] | None, + messages: str | list[LLMMessage], + agent_info: dict[str, Any], + ) -> LiteAgentOutput: + """Apply guardrails, save to memory, and emit completion event. + + Args: + output: The execution output. + executor: The agent executor. + inputs: The execution inputs. + response_format: Optional response format. + messages: The original messages. + agent_info: Agent metadata for events. + + Returns: + The finalized output. + """ + if self.guardrail is not None: + output = self._process_kickoff_guardrail( + output=output, + executor=executor, + inputs=inputs, + response_format=response_format, ) - raise + + self._save_kickoff_to_memory(messages, output.raw) + + crewai_event_bus.emit( + self, + event=LiteAgentExecutionCompletedEvent( + agent_info=agent_info, + output=output.raw, + ), + ) + + return output + + def _emit_kickoff_error(self, agent_info: dict[str, Any], e: Exception) -> NoReturn: + """Emit a kickoff error event and re-raise.""" + crewai_event_bus.emit( + self, + event=LiteAgentExecutionErrorEvent( + agent_info=agent_info, + error=str(e), + ), + ) + raise e def _save_kickoff_to_memory( self, messages: str | list[LLMMessage], output_text: str @@ -1562,11 +1572,8 @@ class Agent(BaseAgent): Returns: LiteAgentOutput with raw output, formatted result, and metrics. """ - import json - output = result.get("output", "") - # Handle response format conversion formatted_result: BaseModel | None = None raw_output: str @@ -1583,7 +1590,7 @@ class Agent(BaseAgent): ) converter = Converter( - llm=self.llm, + llm=cast(BaseLLM, self.llm), text=raw_output, model=response_format, instructions=instructions, @@ -1597,7 +1604,6 @@ class Agent(BaseAgent): else: raw_output = str(output) if not isinstance(output, str) else output - # Get token usage metrics if isinstance(self.llm, BaseLLM): usage_metrics = self.llm.get_token_usage_summary() else: @@ -1665,9 +1671,6 @@ class Agent(BaseAgent): Returns: Validated/updated output. """ - from crewai.utilities.guardrail_types import GuardrailCallable - - # Ensure guardrail is callable guardrail_callable: GuardrailCallable if isinstance(self.guardrail, str): from crewai.tasks.llm_guardrail import LLMGuardrail @@ -1697,16 +1700,13 @@ class Agent(BaseAgent): f"Last error: {guardrail_result.error}" ) - # Add feedback and re-execute executor._append_message_to_state( guardrail_result.error or "Guardrail validation failed", role="user", ) - # Re-execute and build new output output = self._execute_and_build_output(executor, inputs, response_format) - # Recursively retry guardrail return self._process_kickoff_guardrail( output=output, executor=executor, @@ -1715,7 +1715,6 @@ class Agent(BaseAgent): retry_count=retry_count + 1, ) - # Apply guardrail result if available if guardrail_result.result is not None: if isinstance(guardrail_result.result, str): output.raw = guardrail_result.result @@ -1765,37 +1764,12 @@ class Agent(BaseAgent): output = await self._execute_and_build_output_async( executor, inputs, response_format ) - - if self.guardrail is not None: - output = self._process_kickoff_guardrail( - output=output, - executor=executor, - inputs=inputs, - response_format=response_format, - ) - - # Save to memory after async execution (passive save) - self._save_kickoff_to_memory(messages, output.raw) - - crewai_event_bus.emit( - self, - event=LiteAgentExecutionCompletedEvent( - agent_info=agent_info, - output=output.raw, - ), + return self._finalize_kickoff( + output, executor, inputs, response_format, messages, agent_info ) - return output - except Exception as e: - crewai_event_bus.emit( - self, - event=LiteAgentExecutionErrorEvent( - agent_info=agent_info, - error=str(e), - ), - ) - raise + self._emit_kickoff_error(agent_info, e) async def akickoff( self, @@ -1816,7 +1790,6 @@ class Agent(BaseAgent): return await self.kickoff_async(messages, response_format, input_files) -# Rebuild Agent model to resolve A2A type forward references try: from crewai.a2a.config import ( A2AClientConfig as _A2AClientConfig, diff --git a/lib/crewai/tests/agents/test_agent_inject_date.py b/lib/crewai/tests/agents/test_agent_inject_date.py index 7ff6b1440..0ca9da18f 100644 --- a/lib/crewai/tests/agents/test_agent_inject_date.py +++ b/lib/crewai/tests/agents/test_agent_inject_date.py @@ -4,13 +4,15 @@ from unittest.mock import patch from crewai.agent import Agent from crewai.task import Task +MOCK_TARGET = "crewai.agent.core.datetime" + def test_agent_inject_date(): """Test that the inject_date flag injects the current date into the task. Tests that when inject_date=True, the current date is added to the task description. """ - with patch("datetime.datetime") as mock_datetime: + with patch(MOCK_TARGET) as mock_datetime: mock_datetime.now.return_value = datetime(2025, 1, 1) agent = Agent( @@ -26,7 +28,6 @@ def test_agent_inject_date(): agent=agent, ) - # Store original description original_description = task.description agent._inject_date_to_task(task) @@ -44,7 +45,6 @@ def test_agent_without_inject_date(): role="test_agent", goal="test_goal", backstory="test_backstory", - # inject_date is False by default ) task = Task( @@ -65,7 +65,7 @@ def test_agent_inject_date_custom_format(): Tests that when inject_date=True with a custom date_format, the date is formatted correctly. """ - with patch("datetime.datetime") as mock_datetime: + with patch(MOCK_TARGET) as mock_datetime: mock_datetime.now.return_value = datetime(2025, 1, 1) agent = Agent( @@ -82,7 +82,6 @@ def test_agent_inject_date_custom_format(): agent=agent, ) - # Store original description original_description = task.description agent._inject_date_to_task(task)