Files
crewAI/lib/crewai/src/crewai/agent/core.py
João Moura bb477f8a91
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
JSON first crews (#6131)
* feat(cli): introduce JSON crew project support and TUI enhancements

- Added support for creating and running JSON-defined crew projects, allowing users to scaffold projects with a new `create_json_crew.py` file.
- Implemented a full-screen Textual TUI for crew execution in `crew_run_tui.py`, enhancing user interaction with a two-column layout.
- Updated `run_crew.py` to prioritize JSON crew projects and added daemon mode for running without TUI.
- Introduced interactive pickers in `tui_picker.py` for improved CLI prompts.
- Enhanced validation for JSON crew files in `validate.py` to ensure proper structure and agent definitions.
- Updated `.gitignore` to exclude demo and crewai directories.

* feat: update LLM model references to gpt-5.4-mini

- Changed default LLM model from gpt-4o-mini to gpt-5.4-mini across various files, including CLI options, JSON crew configurations, and agent definitions.
- Enhanced benchmark and human feedback functionalities to utilize the new model.
- Improved user interface elements in the TUI for better interaction and feedback during execution.
- Added support for new skills directory in JSON crew project creation.

* feat(benchmark): add crew-level benchmarking functionality

- Introduced a new `benchmark` command in the CLI for crew-level benchmarking, allowing users to specify agents, models, and timeout settings.
- Implemented `CrewBenchmarkCase` to handle crew-level benchmark cases with inputs and criteria.
- Enhanced the benchmark runner to support progress tracking and detailed reporting of results for multiple models.
- Added tests for loading crew benchmark cases and validating their structure.
- Updated existing benchmark functions to accommodate the new crew-level execution model.

* feat(cli): enhance JSON crew project functionality and TUI improvements

- Added optional agent-level guardrails and advanced options in JSON crew configurations to improve output validation and flexibility.
- Updated the TUI to better handle plan step statuses, including visual indicators for task completion and failure.
- Introduced methods for parsing and managing step observation events, ensuring accurate updates to task statuses during execution.
- Enhanced validation for JSON crew projects, ensuring proper structure and error handling for agent and task definitions.
- Added comprehensive tests for new features and validation logic, ensuring robustness in JSON crew project handling.

* refactor(cli): streamline JSON crew project handling and improve validation

- Refactored JSON crew project loading and validation logic to enhance clarity and maintainability.
- Introduced utility functions for finding JSON crew files, improving code reuse across modules.
- Removed deprecated benchmark functionality and associated tests to simplify the codebase.
- Updated CLI commands to utilize the new JSON project structure, ensuring compatibility with recent changes.
- Enhanced test coverage for JSON crew project features, ensuring robust validation and error handling.

* feat(cli): enhance activity log navigation and focus management

- Added functionality to focus on the activity log when navigating through log entries.
- Implemented refresh logic for the log panel to ensure updates are displayed correctly during navigation.
- Improved keyboard navigation for log entries, allowing users to expand and scroll through logs seamlessly.
- Added tests to verify the correct behavior of log navigation and focus management in the TUI.

* feat(cli): enhance JSON crew project interaction and input handling

- Introduced a new function to enable prompt line editing for better user experience during input prompts.
- Updated the JSON crew project wizards to show interpolation hints for dynamic values, improving user guidance.
- Enhanced the handling of missing input placeholders by prompting users for required values during crew setup.
- Refactored the crew run logic to ensure proper loading and preparation of JSON-defined crews, including runtime input management.
- Added tests to verify the correct behavior of new input handling features and JSON crew project interactions.

* feat(cli): improve crew project input prompts and event handling

- Enhanced the `_prompt_text` function to allow for configurable spacing before prompts, improving user experience during input collection.
- Updated the wizards for agent and task creation to utilize the new prompt configuration, ensuring a more compact and streamlined interaction.
- Introduced new plan step lifecycle events (`PlanStepStartedEvent`, `PlanStepCompletedEvent`) to better track the execution status of plan steps.
- Refactored the step executor to emit these events during the execution of tasks, improving observability and debugging capabilities.
- Added tests to verify the correct behavior of new prompt handling and event emissions during crew project execution.

* fix: refine json-first crew interactions

* fix: prioritize common json crew tools

* fix: make json crew more tools expandable

* fix: show json crew tools by category

* feat(memory): update default embedder to OpenAI text-embedding-3-large and enhance memory compatibility

- Changed the default embedding model for Memory to OpenAI text-embedding-3-large, which uses 3072-dimensional vectors.
- Added warnings regarding compatibility issues with existing local memory stores created with 1536-dimensional embeddings.
- Updated documentation to reflect the new default embedder and its configuration options.
- Enhanced the CLI and codebase to support the new embedding model across various components, ensuring a seamless transition for users.

* fix: address PR review feedback for JSON-first crews

Review blockers:
- Forward trained_agents_file to JSON crews: crewai run -f now exports
  CREWAI_TRAINED_AGENTS_FILE for the in-process JSON crew path
- Wizard agent picker: Esc/cancel now reprompts instead of silently
  assigning the first agent
- JSON tool resolution hard-fails: unknown tool names, missing custom
  tool files, and invalid custom tool modules raise JSONProjectError
  with actionable messages instead of warn-and-continue
- Embedding dimension mismatch: LanceDB and Qdrant Edge storages raise
  EmbeddingDimensionMismatchError with reset/pin guidance instead of
  silently zero-filling vectors or returning empty search results
- Custom tool code execution documented in loader docstring and the
  scaffolded project README

CI fixes:
- ruff format across lib/
- All 133 PR-introduced mypy errors fixed (llm.py lazy-litellm and
  cli.py lazy command shims now use TYPE_CHECKING imports; textual
  is_mounted misuse fixed; pick_many overloads; misc annotations)

Bot review comments:
- Empty except blocks now have explanatory comments or debug logging
- Removed unused _C_BG/_C_PANEL/_C_BORDER globals and redundant
  import re; tests use a single import style for create_json_crew

Tests: trained-agents propagation, wizard cancel, tool resolution
failures, and dimension mismatch guidance.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix: address second round of PR review comments

Cursor Bugbot:
- Wizard agent slugs: strip to [a-z0-9_] and fall back to agent_<n> so
  symbol-only roles can't produce an empty agents/.jsonc filename
- Wizard task names: dedupe against prior task names and fall back to
  task_<n> for symbol-only descriptions

CodeRabbit:
- Agent.message(): import Task explicitly at runtime instead of relying
  on the namespace injection done by crewai/__init__
- Async executor: move the native-tools-unsupported fallback from
  _ainvoke_loop_react (self-recursion) to _ainvoke_loop_native_tools,
  mirroring the sync implementation
- StepExecutor downgrade: keep the in-step conversation and append the
  text-tooling instructions instead of rebuilding messages, so completed
  native tool calls are not re-executed
- crewai-files: extension-based MIME lookup now runs before byte
  sniffing so csv/xml types are not degraded to text/plain
- Memory storages: validate every record in a save() batch against a
  consistent embedding dimension (LanceDB previously checked only the
  first record); added mixed-batch tests
- _print_post_tui_summary now typed against CrewRunApp
- Docs: Azure OpenAI default embedder change called out in the memory
  migration warning and provider table

Code quality bots:
- Removed unused _C_YELLOW/_C_CYAN (crew_run_tui) and _GREEN (tui_picker)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* feat(cli): accordion tool picker in JSON crew wizard

The flat tool list had grown to ~90 rows. The picker now shows:
- Common tools always visible at the top
- Every other category as a single expandable row with tool and
  selection counts (e.g. "Search & Research  (27 tools, 2 selected)")
- Expanding a category collapses the previously expanded one
- Selections persist across expand/collapse via new preselected
  support in pick_many; cursor follows the toggled category row

tui_picker gains preselected + initial_cursor options on pick_many,
and Esc in multi-select now confirms the current selection instead of
discarding it (required so collapsing can't silently drop choices).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* refactor(cli): remove --daemon flag from crewai run

The flag only affected JSON crew projects — classic and flow projects
ignored it entirely, which made the behavior inconsistent. Removed the
option, the daemon code path (_run_json_crew_daemon), and its helper
(_load_json_crew_with_inputs).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* test: update run command tests after --daemon removal

lib/crewai/tests/cli/test_run_crew.py still asserted the old
run_crew(trained_agents_file=..., daemon=False) call signature.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(cli): exit codes, mid-run quit, async statuses, hyphen placeholders

Addresses the latest Bugbot review round:

- Failed JSON crew runs now exit non-zero (SystemExit(1)) so scripts
  and CI don't treat failures as success, mirroring the classic path
- Quitting the TUI mid-run now ends the process (os._exit(130));
  kickoff runs in a thread worker that cannot be force-cancelled, so
  letting the CLI return would leave LLM/tool work burning tokens in
  the background
- Sidebar task statuses are now async-safe: completion/failure events
  resolve the task's own row via identity instead of assuming the most
  recently started task, and starting a task no longer blanket-marks
  earlier active rows as done
- The runtime-input prompt regex now accepts hyphenated placeholder
  names ({my-topic}), matching kickoff's interpolation pattern

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix: validation safety, custom tool sandboxing, TUI log integrity, memory error surfacing

- Deploy validation no longer executes project code: validation mode
  checks tool declarations structurally (well-formed entries, custom
  tool file exists) without importing or instantiating anything.
  custom:<name> resolution only happens on the actual run path.
- custom:<name> is constrained to [A-Za-z_][A-Za-z0-9_]* and the
  resolved path must stay inside the project's tools/ directory, so
  custom:../foo or absolute-path names cannot execute code outside it.
  Tool paths resolve relative to the crew project root, not cwd.
- TUI task logs are built from per-task state captured at task start
  (idx, description, agent, start time); an out-of-order completion
  takes its output from the event and no longer steals or resets the
  current task's streamed steps/output.
- EmbeddingDimensionMismatchError now inherits ValueError instead of
  RuntimeError so background saves surface it through
  MemorySaveFailedEvent instead of silently dropping the save; the
  shutdown catch in _background_encode_batch is narrowed to the
  "cannot schedule new futures" case.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(cli): declared project type wins over crew.json presence

A flow project that also contains a crew.json(c) file now runs and
validates as the flow it declares in pyproject.toml instead of being
hijacked by the JSON crew path. Both crewai run (_has_json_crew) and
deploy validation (_is_json_crew) check tool.crewai.type; a missing or
unreadable pyproject still means a bare JSON crew project.

Also documents why StepObservationFailedEvent intentionally marks the
plan step "done": the event signals an observer failure, not a step
failure, and the executor continues past it.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(cli): type the declared_type locals so mypy stays clean

Comparing an Any-typed .get() chain returns Any, which tripped
no-any-return on the previous commit.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-14 04:19:48 -03:00

1978 lines
72 KiB
Python

"""Core agent implementation for the CrewAI framework."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine, Sequence
import concurrent.futures
import contextvars
from datetime import datetime
import inspect
import json
import os
from pathlib import Path
import time
from typing import (
TYPE_CHECKING,
Annotated,
Any,
Literal,
NoReturn,
cast,
)
import warnings
from pydantic import (
BaseModel,
BeforeValidator,
ConfigDict,
Field,
PrivateAttr,
ValidationError,
model_validator,
)
from pydantic.functional_serializers import PlainSerializer
from typing_extensions import Self, TypeIs
from crewai.agent.planning_config import PlanningConfig
from crewai.agent.utils import (
ahandle_knowledge_retrieval,
apply_training_data,
build_task_prompt_with_schema,
format_task_with_context,
get_knowledge_config,
handle_knowledge_retrieval,
prepare_tools,
process_tool_results,
save_last_messages,
validate_max_execution_time,
)
from crewai.agents.agent_builder.base_agent import (
BaseAgent,
_serialize_llm_ref,
_validate_llm_ref,
)
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
)
from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalFailedEvent,
MemoryRetrievalStartedEvent,
)
from crewai.events.types.skill_events import SkillActivatedEvent
from crewai.experimental.agent_executor import AgentExecutor
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.lite_agent_output import LiteAgentOutput
from crewai.llms.base_llm import BaseLLM
from crewai.mcp.config import MCPServerConfig
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.security.fingerprint import Fingerprint
from crewai.skills.loader import activate_skill, discover_skills
from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel
from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.types.callback import SerializableCallable
from crewai.utilities.agent_utils import (
get_tool_names,
is_inside_event_loop,
load_agent_from_repository,
parse_tools,
render_text_description_and_args,
)
from crewai.utilities.constants import (
CREWAI_TRAINED_AGENTS_FILE_ENV,
TRAINED_AGENTS_DATA_FILE,
TRAINING_DATA_FILE,
)
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.env import get_env_context
from crewai.utilities.guardrail import process_guardrail, serialize_guardrail_for_json
from crewai.utilities.guardrail_types import GuardrailCallable, GuardrailType
from crewai.utilities.i18n import I18N_DEFAULT
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
from crewai.a2a.types import AgentResponseProtocol
except ImportError:
AgentResponseProtocol = None # type: ignore[assignment, misc]
if TYPE_CHECKING:
from crewai_files import FileInput
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.agents.agent_builder.base_agent import PlatformAppOrAction
from crewai.mcp.tool_resolver import MCPToolResolver
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.types import LLMMessage
_passthrough_exceptions: tuple[type[Exception], ...] = ()
_EXECUTOR_CLASS_MAP: dict[str, type] = {
"CrewAgentExecutor": CrewAgentExecutor,
"AgentExecutor": AgentExecutor,
}
def _is_resuming_agent_executor(
executor: CrewAgentExecutor | AgentExecutor | None,
) -> TypeIs[AgentExecutor]:
"""Type guard: True when the executor is resuming from a checkpoint."""
return isinstance(executor, AgentExecutor) and executor._resuming
def _validate_executor_class(value: Any) -> Any:
if isinstance(value, str):
cls = _EXECUTOR_CLASS_MAP.get(value)
if cls is None:
raise ValueError(f"Unknown executor class: {value}")
value = cls
import warnings
if value is CrewAgentExecutor:
warnings.warn(
"CrewAgentExecutor is deprecated and will be removed in a future release. "
"Agents inside Crews now use AgentExecutor by default. "
"Switch to crewai.experimental.AgentExecutor.",
DeprecationWarning,
stacklevel=3,
)
return value
def _serialize_executor_class(value: Any) -> str:
return value.__name__ if isinstance(value, type) else str(value)
class Agent(BaseAgent):
"""Represents an agent in a system.
Each agent has a role, a goal, a backstory, and an optional language model (llm).
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes:
agent_executor: An instance of the CrewAgentExecutor or AgentExecutor class.
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
knowledge: The knowledge base of the agent.
config: Dict representation of agent configuration.
llm: The language model that will run the agent.
function_calling_llm: The language model that will handle the tool calling for this agent, it overrides the crew function_calling_llm.
max_iter: Maximum number of iterations for an agent to execute a task.
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
knowledge_sources: Knowledge sources for the agent.
embedder: Embedder configuration for the agent.
apps: List of applications that the agent can access through CrewAI Platform.
mcps: List of MCP server references for tool integration.
"""
model_config = ConfigDict()
_times_executed: int = PrivateAttr(default=0)
_mcp_resolver: MCPToolResolver | None = PrivateAttr(default=None)
_last_messages: list[LLMMessage] = PrivateAttr(default_factory=list)
max_execution_time: int | None = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
)
step_callback: SerializableCallable | None = Field(
default=None,
description="Callback to be executed after each step of the agent execution.",
)
use_system_prompt: bool | None = Field(
default=True,
description="Use system prompt for the agent.",
)
llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(description="Language model that will run the agent.", default=None)
function_calling_llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(
description="Language model that will run the agent.",
default=None,
deprecated="function_calling_llm is deprecated and will be removed in a future release.",
)
system_template: str | None = Field(
default=None, description="System format for the agent."
)
prompt_template: str | None = Field(
default=None, description="Prompt format for the agent."
)
response_template: str | None = Field(
default=None, description="Response format for the agent."
)
allow_code_execution: bool | None = Field(
default=False,
deprecated=True,
description="Deprecated. CodeInterpreterTool is no longer available. Use dedicated sandbox services instead.",
)
respect_context_window: bool = Field(
default=True,
description="Keep messages under the context window size by summarizing content.",
)
max_retry_limit: int = Field(
default=2,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
)
multimodal: bool = Field(
default=False,
deprecated=True,
description="[DEPRECATED, will be removed in v2.0 - pass files natively.] Whether the agent is multimodal.",
)
inject_date: bool = Field(
default=False,
description="Whether to automatically inject the current date into tasks.",
)
date_format: str = Field(
default="%Y-%m-%d",
description="Format string for date when inject_date is enabled.",
)
code_execution_mode: Literal["safe", "unsafe"] = Field(
default="safe",
deprecated=True,
description="Deprecated. CodeInterpreterTool is no longer available. Use dedicated sandbox services instead.",
)
planning_config: PlanningConfig | None = Field(
default=None,
description="Configuration for agent planning before task execution.",
)
planning: bool = Field(
default=False,
description="Whether the agent should reflect and create a plan before executing a task.",
)
reasoning: bool = Field(
default=False,
description="[DEPRECATED: Use planning_config instead] Whether the agent should reflect and create a plan before executing a task.",
deprecated=True,
)
max_reasoning_attempts: int | None = Field(
default=None,
description="[DEPRECATED: Use planning_config.max_attempts instead] Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
deprecated=True,
)
embedder: EmbedderConfig | None = Field(
default=None,
description="Embedder configuration for the agent.",
)
agent_knowledge_context: str | None = Field(
default=None,
description="Knowledge context for the agent.",
)
crew_knowledge_context: str | None = Field(
default=None,
description="Knowledge context for the crew.",
)
knowledge_search_query: str | None = Field(
default=None,
description="Knowledge search query for the agent dynamically generated by the agent.",
)
from_repository: str | None = Field(
default=None,
description="The Agent's role to be used from your repository.",
)
guardrail: Annotated[
GuardrailType | None,
PlainSerializer(
serialize_guardrail_for_json,
return_type=str | None,
when_used="json",
),
] = Field(
default=None,
description="Function or string description of a guardrail to validate agent output",
)
guardrail_max_retries: int = Field(
default=3, description="Maximum number of retries when guardrail fails"
)
a2a: (
list[A2AConfig | A2AServerConfig | A2AClientConfig]
| A2AConfig
| A2AServerConfig
| A2AClientConfig
| None
) = Field(
default=None,
description="""
A2A (Agent-to-Agent) configuration for delegating tasks to remote agents.
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
""",
)
agent_executor: CrewAgentExecutor | AgentExecutor | None = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
executor_class: Annotated[
type[CrewAgentExecutor] | type[AgentExecutor],
BeforeValidator(_validate_executor_class),
PlainSerializer(_serialize_executor_class, return_type=str, when_used="json"),
] = Field(
default=AgentExecutor,
description="Class to use for the agent executor. Defaults to AgentExecutor, can optionally use CrewAgentExecutor.",
)
@model_validator(mode="before")
@classmethod
def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any:
"""Merge repository agent config with provided values before validation."""
if v is not None and (from_repository := v.get("from_repository")):
return load_agent_from_repository(from_repository) | v
return v
@model_validator(mode="after")
def post_init_setup(self) -> Self:
"""Initialize LLM, executor, code tools, and skills after model creation."""
self.llm = create_llm(self.llm)
if self.function_calling_llm and not isinstance(
self.function_calling_llm, BaseLLM
):
self.function_calling_llm = create_llm(self.function_calling_llm)
if not self.agent_executor:
self._setup_agent_executor()
if self.allow_code_execution:
warnings.warn(
"allow_code_execution is deprecated and will be removed in v2.0. "
"CodeInterpreterTool is no longer available. "
"Use dedicated sandbox services like E2B or Modal.",
DeprecationWarning,
stacklevel=2,
)
self.set_skills()
if self.reasoning and self.planning_config is None:
warnings.warn(
"The 'reasoning' parameter is deprecated. Use 'planning_config=PlanningConfig()' instead.",
DeprecationWarning,
stacklevel=2,
)
kwargs: dict[str, int] = {}
if self.max_reasoning_attempts is not None:
kwargs["max_attempts"] = self.max_reasoning_attempts
self.planning_config = PlanningConfig(**kwargs)
if self.planning and self.planning_config is None:
# Bare planning=True should be bounded and avoid per-step
# PlannerObserver LLM calls unless explicitly configured.
self.planning_config = PlanningConfig(
reasoning_effort="low",
max_attempts=1,
)
return self
@property
def planning_enabled(self) -> bool:
"""Check if planning is enabled for this agent."""
return self.planning_config is not None or self.planning
def _setup_agent_executor(self) -> None:
"""Initialize the agent executor with a default cache handler."""
if not self.cache_handler:
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)
def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None:
"""Initialize knowledge sources with the agent or crew embedder config."""
try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
if self.knowledge_sources:
if isinstance(self.knowledge_sources, list) and all(
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources
):
self.knowledge = Knowledge(
sources=self.knowledge_sources,
embedder=self.embedder,
collection_name=self.role,
)
self.knowledge.add_sources()
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {e!s}") from e
def set_skills(
self,
resolved_crew_skills: list[SkillModel] | None = None,
) -> None:
"""Resolve skill paths while preserving explicit disclosure levels.
Path entries trigger discovery and activation because directory-based
skills opt into eager loading. Pre-loaded Skill objects keep their
current disclosure level so callers can attach METADATA-only skills and
progressively activate them later. Crew-level skills are merged in with
event emission so observability is consistent regardless of origin.
Args:
resolved_crew_skills: Pre-resolved crew skills. When provided,
avoids redundant discovery per agent.
"""
from crewai.crew import Crew
if resolved_crew_skills is None:
crew_skills: list[Path | SkillModel | str] | None = (
self.crew.skills
if isinstance(self.crew, Crew) and isinstance(self.crew.skills, list)
else None
)
else:
crew_skills = list(resolved_crew_skills)
if not self.skills and not crew_skills:
return
needs_work = self.skills and any(
isinstance(s, (Path, str))
or (isinstance(s, SkillModel) and s.disclosure_level < INSTRUCTIONS)
for s in self.skills
)
if not needs_work and not crew_skills:
return
seen: set[str] = set()
resolved: list[Path | SkillModel | str] = []
items: list[Path | SkillModel | str] = list(self.skills) if self.skills else []
if crew_skills:
items.extend(crew_skills)
for item in items:
if isinstance(item, str):
from crewai.experimental.skills.registry import (
is_registry_ref,
parse_registry_ref,
resolve_registry_ref,
)
if is_registry_ref(item):
skill = resolve_registry_ref(item, source=self)
org, _ = parse_registry_ref(item)
dedup_key = f"{org}/{skill.name}"
if dedup_key not in seen:
seen.add(dedup_key)
resolved.append(skill)
elif isinstance(item, Path):
discovered = discover_skills(item, source=self)
for skill in discovered:
if skill.name not in seen:
seen.add(skill.name)
resolved.append(activate_skill(skill, source=self))
elif isinstance(item, SkillModel):
if item.name not in seen:
seen.add(item.name)
if item.disclosure_level >= INSTRUCTIONS:
crewai_event_bus.emit(
self,
event=SkillActivatedEvent(
from_agent=self,
skill_name=item.name,
skill_path=item.path,
disclosure_level=item.disclosure_level,
),
)
resolved.append(item)
self.skills = resolved if resolved else None
def _is_any_available_memory(self) -> bool:
"""Check if unified memory is available (agent or crew)."""
if getattr(self, "memory", None):
return True
if self.crew and getattr(self.crew, "_memory", None):
return True
return False
def _supports_native_tool_calling(self, tools: list[BaseTool]) -> bool:
"""Check if the LLM supports native function calling with the given tools.
Args:
tools: List of tools to check against.
Returns:
True if native function calling is supported and tools are available.
"""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling() # type: ignore[union-attr]
and len(tools) > 0
)
def _prepare_task_execution(
self,
task: Task,
context: str | None,
) -> str:
"""Prepare common setup for task execution shared by sync and async paths.
Handles reasoning, date injection, prompt building, and memory retrieval.
Args:
task: Task to execute.
context: Context to execute the task in.
Returns:
The task prompt after memory retrieval, ready for knowledge lookup.
"""
get_env_context()
self._inject_date_to_task(task)
if self.tools_handler:
self.tools_handler.last_used_tool = None
task_prompt = task.prompt()
task_prompt = build_task_prompt_with_schema(task, task_prompt)
task_prompt = format_task_with_context(task_prompt, context)
return self._retrieve_memory_context(task, task_prompt)
def _finalize_task_prompt(
self,
task_prompt: str,
tools: list[BaseTool] | None,
task: Task,
) -> str:
"""Apply skill context, tool preparation, and training data to the task prompt.
Args:
task_prompt: The task prompt after memory and knowledge retrieval.
tools: Tools to use for the task.
task: Task to execute.
Returns:
The fully prepared task prompt.
"""
prepare_tools(self, tools, task)
return apply_training_data(self, task_prompt)
def _retrieve_memory_context(self, task: Task, task_prompt: str) -> str:
"""Retrieve memory context and append it to the task prompt.
Args:
task: The task being executed.
task_prompt: The current task prompt.
Returns:
The task prompt, potentially augmented with memory context.
"""
if not self._is_any_available_memory():
return task_prompt
crewai_event_bus.emit(
self,
event=MemoryRetrievalStartedEvent(
task_id=str(task.id) if task else None,
source_type="agent",
from_agent=self,
from_task=task,
),
)
start_time = time.time()
memory = ""
try:
unified_memory = getattr(self, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None
)
if unified_memory is not None:
query = task.description
matches = unified_memory.recall(query, limit=5)
if matches:
memory = "Relevant memories:\n" + "\n".join(
m.format() for m in matches
)
if memory.strip() != "":
task_prompt += I18N_DEFAULT.slice("memory").format(memory=memory)
crewai_event_bus.emit(
self,
event=MemoryRetrievalCompletedEvent(
task_id=str(task.id) if task else None,
memory_content=memory,
retrieval_time_ms=(time.time() - start_time) * 1000,
source_type="agent",
from_agent=self,
from_task=task,
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryRetrievalFailedEvent(
task_id=str(task.id) if task else None,
source_type="agent",
from_agent=self,
from_task=task,
error=str(e),
),
)
return task_prompt
def _finalize_task_execution(self, task: Task, result: Any) -> Any:
"""Finalize task execution with RPM cleanup, tool processing, and event emission.
Args:
task: The task that was executed.
result: The raw execution result.
Returns:
The processed result.
"""
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
result = process_tool_results(self, result)
output_for_event = result
if (
AgentResponseProtocol is not None
and isinstance(result, BaseModel)
and isinstance(result, AgentResponseProtocol)
):
output_for_event = str(result.message)
elif not isinstance(result, str):
output_for_event = str(result)
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(
agent=self, task=task, output=output_for_event
),
)
save_last_messages(self)
self._cleanup_mcp_clients()
return result
def _check_execution_error(self, e: Exception, task: Task) -> None:
"""Check if an execution error should be re-raised immediately.
Args:
e: The exception that occurred.
task: The task being executed.
Raises:
Exception: If the error is from litellm, a passthrough, or retries are exhausted.
"""
if e.__class__.__module__.startswith("litellm"):
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
if isinstance(e, _passthrough_exceptions):
raise
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
def _handle_execution_error(
self,
e: Exception,
task: Task,
context: str | None,
tools: list[BaseTool] | None,
) -> Any:
"""Handle execution errors with retry logic (sync path).
Args:
e: The exception that occurred.
task: The task being executed.
context: Task context.
tools: Task tools.
Returns:
Result from retried execution.
"""
self._check_execution_error(e, task)
return self.execute_task(task, context, tools)
async def _handle_execution_error_async(
self,
e: Exception,
task: Task,
context: str | None,
tools: list[BaseTool] | None,
) -> Any:
"""Handle execution errors with retry logic (async path).
Args:
e: The exception that occurred.
task: The task being executed.
context: Task context.
tools: Task tools.
Returns:
Result from retried execution.
"""
self._check_execution_error(e, task)
return await self.aexecute_task(task, context, tools)
def message(self, content: str, **kwargs: Any) -> str:
"""Send a single message and get a response.
Creates a temporary Task + Crew, executes, and returns the raw output.
"""
from crewai.crew import Crew
from crewai.task import Task
from crewai.types.streaming import CrewStreamingOutput
task = Task(
description=content,
expected_output="Respond to the user's message appropriately.",
agent=self,
)
crew = Crew(
agents=[self],
tasks=[task],
verbose=self.verbose,
memory=self.memory or False,
)
result = crew.kickoff()
if isinstance(result, CrewStreamingOutput):
return result.result.raw
return result.raw
def execute_task(
self,
task: Task,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Any:
"""Execute a task with the agent.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
Raises:
TimeoutError: If execution exceeds the maximum execution time.
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
task_prompt = self._prepare_task_execution(task, context)
knowledge_config = get_knowledge_config(self)
task_prompt = handle_knowledge_retrieval(
self,
task,
task_prompt,
knowledge_config,
self.knowledge.query if self.knowledge else lambda *a, **k: None,
self.crew.query_knowledge
if self.crew and not isinstance(self.crew, str)
else lambda *a, **k: None,
)
task_prompt = self._finalize_task_prompt(task_prompt, tools, task)
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
validate_max_execution_time(self.max_execution_time)
if self.max_execution_time is not None:
result = self._execute_with_timeout(
task_prompt, task, self.max_execution_time
)
else:
result = self._execute_without_timeout(task_prompt, task)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except Exception as e:
result = self._handle_execution_error(e, task, context, tools)
return self._finalize_task_execution(task, result)
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any:
"""Execute a task with a timeout.
Args:
task_prompt: The prompt to send to the agent.
task: The task being executed.
timeout: Maximum execution time in seconds.
Returns:
The output of the agent.
Raises:
TimeoutError: If execution exceeds the timeout.
RuntimeError: If execution fails for other reasons.
"""
ctx = contextvars.copy_context()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
ctx.run,
self._execute_without_timeout,
task_prompt=task_prompt,
task=task,
)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError as e:
future.cancel()
raise TimeoutError(
f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task."
) from e
except Exception as e:
future.cancel()
raise RuntimeError(f"Task execution failed: {e!s}") from e
def _execute_without_timeout(self, task_prompt: str, task: Task) -> Any:
"""Execute a task without a timeout.
Args:
task_prompt: The prompt to send to the agent.
task: The task being executed.
Returns:
The output of the agent.
"""
if not self.agent_executor:
raise RuntimeError("Agent executor is not initialized.")
invoke_result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
}
)
if inspect.isawaitable(invoke_result):
invoke_result.close()
raise RuntimeError(
"Agent execution was invoked synchronously from within a running "
"event loop. Use `agent.kickoff_async()` / `crew.kickoff_async()` "
"(or `await agent.aexecute_task(...)`) when calling from async code."
)
return invoke_result["output"]
async def aexecute_task(
self,
task: Task,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Any:
"""Execute a task with the agent asynchronously.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent.
Raises:
TimeoutError: If execution exceeds the maximum execution time.
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
task_prompt = self._prepare_task_execution(task, context)
knowledge_config = get_knowledge_config(self)
task_prompt = await ahandle_knowledge_retrieval(
self, task, task_prompt, knowledge_config
)
task_prompt = self._finalize_task_prompt(task_prompt, tools, task)
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
validate_max_execution_time(self.max_execution_time)
if self.max_execution_time is not None:
result = await self._aexecute_with_timeout(
task_prompt, task, self.max_execution_time
)
else:
result = await self._aexecute_without_timeout(task_prompt, task)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except Exception as e:
result = await self._handle_execution_error_async(e, task, context, tools)
return self._finalize_task_execution(task, result)
async def _aexecute_with_timeout(
self, task_prompt: str, task: Task, timeout: int
) -> Any:
"""Execute a task with a timeout asynchronously.
Args:
task_prompt: The prompt to send to the agent.
task: The task being executed.
timeout: Maximum execution time in seconds.
Returns:
The output of the agent.
Raises:
TimeoutError: If execution exceeds the timeout.
RuntimeError: If execution fails for other reasons.
"""
try:
return await asyncio.wait_for(
self._aexecute_without_timeout(task_prompt, task),
timeout=timeout,
)
except asyncio.TimeoutError as e:
raise TimeoutError(
f"Task '{task.description}' execution timed out after {timeout} seconds. "
"Consider increasing max_execution_time or optimizing the task."
) from e
async def _aexecute_without_timeout(self, task_prompt: str, task: Task) -> Any:
"""Execute a task without a timeout asynchronously.
Args:
task_prompt: The prompt to send to the agent.
task: The task being executed.
Returns:
The output of the agent.
"""
if not self.agent_executor:
raise RuntimeError("Agent executor is not initialized.")
result = await self.agent_executor.ainvoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
}
)
return result["output"]
def _build_execution_prompt(
self, raw_tools: list[BaseTool]
) -> tuple[
SystemPromptResult | StandardPromptResult, list[str], Callable[[], bool] | None
]:
"""Build the execution prompt, stop words, and RPM limit function.
Args:
raw_tools: The raw tools available to the agent.
Returns:
A tuple of (prompt, stop_words, rpm_limit_fn).
"""
use_native_tool_calling = self._supports_native_tool_calling(raw_tools)
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
use_native_tool_calling=use_native_tool_calling,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
prompt_template=self.prompt_template,
response_template=self.response_template,
).task_execution()
stop_words = [I18N_DEFAULT.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
rpm_limit_fn = (
self._rpm_controller.check_or_wait if self._rpm_controller else None
)
return prompt, stop_words, rpm_limit_fn
def create_agent_executor(
self, tools: list[BaseTool] | None = None, task: Task | None = None
) -> None:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
"""
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
prompt, stop_words, rpm_limit_fn = self._build_execution_prompt(raw_tools)
if self.agent_executor is not None:
self._update_executor_parameters(
task=task,
tools=parsed_tools,
raw_tools=raw_tools,
prompt=prompt,
stop_words=stop_words,
rpm_limit_fn=rpm_limit_fn,
)
else:
if not isinstance(self.llm, BaseLLM):
raise RuntimeError(
"LLM must be resolved before creating agent executor."
)
self.agent_executor = self.executor_class(
llm=self.llm,
task=task,
agent=self,
crew=self.crew,
tools=parsed_tools,
prompt=prompt,
original_tools=raw_tools,
stop_words=stop_words,
max_iter=self.max_iter,
tools_handler=self.tools_handler,
tools_names=get_tool_names(parsed_tools),
tools_description=render_text_description_and_args(parsed_tools),
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=(
task.response_model or task.output_pydantic or task.output_json
)
if task
else None,
)
def _update_executor_parameters(
self,
task: Task | None,
tools: list[CrewStructuredTool],
raw_tools: list[BaseTool],
prompt: SystemPromptResult | StandardPromptResult,
stop_words: list[str],
rpm_limit_fn: Callable | None, # type: ignore[type-arg]
) -> None:
"""Update executor parameters without recreating instance.
Args:
task: Task to execute.
tools: Parsed tools.
raw_tools: Original tools.
prompt: Generated prompt.
stop_words: Stop words list.
rpm_limit_fn: RPM limit callback function.
"""
if self.agent_executor is None:
raise RuntimeError("Agent executor is not initialized.")
if not isinstance(self.llm, BaseLLM):
raise RuntimeError(
"LLM must be resolved before updating agent executor parameters."
)
if task is not None:
self.agent_executor.task = task
self.agent_executor.llm = self.llm
self.agent_executor.tools = tools
self.agent_executor.original_tools = raw_tools
self.agent_executor.prompt = prompt
if isinstance(self.agent_executor, AgentExecutor):
self.agent_executor.stop_words = stop_words
else:
self.agent_executor.stop = stop_words
self.agent_executor.tools_names = get_tool_names(tools)
self.agent_executor.tools_description = render_text_description_and_args(tools)
self.agent_executor.response_model = (
(task.response_model or task.output_pydantic or task.output_json)
if task
else None
)
self.agent_executor.tools_handler = self.tools_handler
self.agent_executor.request_within_rpm_limit = rpm_limit_fn
def get_delegation_tools(self, agents: Sequence[BaseAgent]) -> list[BaseTool]:
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()
def get_platform_tools(self, apps: list[PlatformAppOrAction]) -> list[BaseTool]:
try:
from crewai_tools import (
CrewaiPlatformTools,
)
return CrewaiPlatformTools(apps=apps)
except Exception as e:
self._logger.log("error", f"Error getting platform tools: {e!s}")
return []
def get_mcp_tools(self, mcps: list[str | MCPServerConfig]) -> list[BaseTool]:
"""Convert MCP server references/configs to CrewAI tools.
Delegates to :class:`~crewai.mcp.tool_resolver.MCPToolResolver`.
"""
self._cleanup_mcp_clients()
from crewai.mcp.tool_resolver import MCPToolResolver
self._mcp_resolver = MCPToolResolver(agent=self, logger=self._logger)
return self._mcp_resolver.resolve(mcps)
def _cleanup_mcp_clients(self) -> None:
"""Cleanup MCP client connections after task execution."""
if self._mcp_resolver is not None:
self._mcp_resolver.cleanup()
self._mcp_resolver = None
@staticmethod
def get_multimodal_tools() -> Sequence[BaseTool]:
"""Return tools for multimodal agent capabilities."""
from crewai.tools.agent_tools.add_image_tool import AddImageTool
return [AddImageTool()]
def get_code_execution_tools(self) -> list[Any]:
"""Deprecated: CodeInterpreterTool is no longer available."""
warnings.warn(
"CodeInterpreterTool is no longer available. "
"Use dedicated sandbox services like E2B or Modal.",
DeprecationWarning,
stacklevel=2,
)
return []
@staticmethod
def get_output_converter(
llm: BaseLLM, text: str, model: type[BaseModel], instructions: str
) -> Converter:
"""Create a Converter instance for transforming LLM output to a structured model."""
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def _training_handler(self, task_prompt: str) -> str:
"""Handle training data for the agent task prompt to improve output on Training."""
if data := CrewTrainingHandler(TRAINING_DATA_FILE).load():
agent_id = str(self.id)
if data.get(agent_id):
human_feedbacks = [
i["human_feedback"] for i in data.get(agent_id, {}).values()
]
task_prompt += (
"\n\nYou MUST follow these instructions: \n "
+ "\n - ".join(human_feedbacks)
)
return task_prompt
def _use_trained_data(self, task_prompt: str) -> str:
"""Use trained data for the agent task prompt to improve output."""
crew_trained_agents_file = (
getattr(self.crew, "trained_agents_file", None)
if self.crew and not isinstance(self.crew, str)
else None
)
trained_file = (
os.fspath(crew_trained_agents_file)
if crew_trained_agents_file
else os.getenv(CREWAI_TRAINED_AGENTS_FILE_ENV, TRAINED_AGENTS_DATA_FILE)
)
if data := CrewTrainingHandler(trained_file).load():
if trained_data_output := data.get(self.role):
task_prompt += (
"\n\nYou MUST follow these instructions: \n - "
+ "\n - ".join(trained_data_output["suggestions"])
)
return task_prompt
@staticmethod
def _render_text_description(tools: list[Any]) -> str:
"""Render the tool name and description in plain text.
Output will be in the format of:
.. code-block:: markdown
search: This tool is used for search
calculator: This tool is used for math
"""
return "\n".join(
[
f"Tool name: {sanitize_tool_name(tool.name)}\nTool description:\n{tool.description}"
for tool in tools
]
)
def _inject_date_to_task(self, task: Task) -> None:
"""Inject the current date into the task description if inject_date is enabled."""
if self.inject_date:
try:
valid_format_codes = [
"%Y",
"%m",
"%d",
"%H",
"%M",
"%S",
"%B",
"%b",
"%A",
"%a",
]
is_valid = any(code in self.date_format for code in valid_format_codes)
if not is_valid:
raise ValueError(f"Invalid date format: {self.date_format}")
current_date = datetime.now().strftime(self.date_format)
task.description += f"\n\nCurrent Date: {current_date}"
except Exception as e:
self._logger.log("warning", f"Failed to inject date: {e!s}")
def _validate_docker_installation(self) -> None:
"""Deprecated: No-op. CodeInterpreterTool is no longer available."""
warnings.warn(
"CodeInterpreterTool is no longer available. "
"Use dedicated sandbox services like E2B or Modal.",
DeprecationWarning,
stacklevel=2,
)
return
def __repr__(self) -> str:
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
@property
def fingerprint(self) -> Fingerprint:
"""
Get the agent's fingerprint.
Returns:
Fingerprint: The agent's fingerprint
"""
return self.security_config.fingerprint
def set_fingerprint(self, fingerprint: Fingerprint) -> None:
"""Set the agent's security fingerprint."""
self.security_config.fingerprint = fingerprint
@property
def last_messages(self) -> list[LLMMessage]:
"""Get messages from the last task execution.
Returns:
List of LLM messages from the most recent task execution.
"""
return self._last_messages
def _get_knowledge_search_query(self, task_prompt: str, task: Task) -> str | None:
"""Generate a search query for the knowledge base based on the task description."""
crewai_event_bus.emit(
self,
event=KnowledgeQueryStartedEvent(
task_prompt=task_prompt,
from_task=task,
from_agent=self,
),
)
query = I18N_DEFAULT.slice("knowledge_search_query").format(
task_prompt=task_prompt
)
rewriter_prompt = I18N_DEFAULT.slice("knowledge_search_query_system_prompt")
if not isinstance(self.llm, BaseLLM):
self._logger.log(
"warning",
f"Knowledge search query failed: LLM for agent '{self.role}' is not an instance of BaseLLM",
)
crewai_event_bus.emit(
self,
event=KnowledgeQueryFailedEvent(
error="LLM is not compatible with knowledge search queries",
from_task=task,
from_agent=self,
),
)
return None
try:
messages: list[LLMMessage] = [
{"role": "system", "content": rewriter_prompt},
{"role": "user", "content": query},
]
rewritten_query = self.llm.call(messages)
crewai_event_bus.emit(
self,
event=KnowledgeQueryCompletedEvent(
query=query,
from_task=task,
from_agent=self,
),
)
return rewritten_query
except Exception as e:
crewai_event_bus.emit(
self,
event=KnowledgeQueryFailedEvent(
error=str(e),
from_task=task,
from_agent=self,
),
)
return None
def _prepare_kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> tuple[AgentExecutor, dict[str, Any], dict[str, Any], list[CrewStructuredTool]]:
"""Prepare common setup for kickoff execution.
This method handles all the common preparation logic shared between
kickoff() and kickoff_async(), including tool processing, prompt building,
executor creation, and input formatting.
Args:
messages: Either a string query or a list of message dictionaries.
response_format: Optional Pydantic model for structured output.
input_files: Optional dict of named files to attach to the message.
Returns:
Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution.
"""
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
if platform_tools:
if self.tools is None:
self.tools = []
self.tools.extend(platform_tools)
if self.mcps:
mcps = self.get_mcp_tools(self.mcps)
if mcps:
if self.tools is None:
self.tools = []
self.tools.extend(mcps)
raw_tools: list[BaseTool] = self.tools or []
agent_memory = getattr(self, "memory", None)
if agent_memory is not None:
from crewai.tools.memory_tools import create_memory_tools
existing_names = {sanitize_tool_name(t.name) for t in raw_tools}
raw_tools.extend(
mt
for mt in create_memory_tools(agent_memory)
if sanitize_tool_name(mt.name) not in existing_names
)
parsed_tools = parse_tools(raw_tools)
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": raw_tools,
"verbose": self.verbose,
}
prompt, stop_words, rpm_limit_fn = self._build_execution_prompt(raw_tools)
if _is_resuming_agent_executor(self.agent_executor):
executor = self.agent_executor
if not isinstance(self.llm, BaseLLM):
raise RuntimeError(
"LLM must be resolved before resuming agent executor."
)
executor.llm = self.llm
executor.tools = parsed_tools
executor.tools_names = get_tool_names(parsed_tools)
executor.tools_description = render_text_description_and_args(parsed_tools)
executor.original_tools = raw_tools
executor.prompt = prompt
executor.response_model = response_format
executor.stop_words = stop_words
executor.tools_handler = self.tools_handler
executor.step_callback = self.step_callback
executor.function_calling_llm = cast(
BaseLLM | None, self.function_calling_llm
)
executor.respect_context_window = self.respect_context_window
executor.request_within_rpm_limit = rpm_limit_fn
executor.callbacks = [TokenCalcHandler(self._token_process)]
else:
executor = AgentExecutor(
llm=cast(BaseLLM, self.llm),
agent=self,
prompt=prompt,
max_iter=self.max_iter,
tools=parsed_tools,
tools_names=get_tool_names(parsed_tools),
stop_words=stop_words,
tools_description=render_text_description_and_args(parsed_tools),
tools_handler=self.tools_handler,
original_tools=raw_tools,
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=response_format,
)
all_files: dict[str, Any] = {}
if isinstance(messages, str):
formatted_messages = messages
else:
formatted_messages = "\n".join(
str(msg.get("content", "")) for msg in messages if msg.get("content")
)
for msg in messages:
if msg.get("files"):
all_files.update(msg["files"])
if input_files:
all_files.update(input_files)
if agent_memory is not None:
try:
crewai_event_bus.emit(
self,
event=MemoryRetrievalStartedEvent(
task_id=None,
source_type="agent_kickoff",
from_agent=self,
),
)
start_time = time.time()
matches = agent_memory.recall(formatted_messages, limit=20)
memory_block = ""
if matches:
memory_block = "Relevant memories:\n" + "\n".join(
m.format() for m in matches
)
if memory_block:
formatted_messages += "\n\n" + I18N_DEFAULT.slice("memory").format(
memory=memory_block
)
crewai_event_bus.emit(
self,
event=MemoryRetrievalCompletedEvent(
task_id=None,
memory_content=memory_block,
retrieval_time_ms=(time.time() - start_time) * 1000,
source_type="agent_kickoff",
from_agent=self,
),
)
except Exception as e:
crewai_event_bus.emit(
self,
event=MemoryRetrievalFailedEvent(
task_id=None,
source_type="agent_kickoff",
from_agent=self,
error=str(e),
),
)
inputs: dict[str, Any] = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),
"tools": render_text_description_and_args(parsed_tools),
}
if all_files:
inputs["files"] = all_files
return executor, inputs, agent_info, parsed_tools
def kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, guardrails, and file inputs.
When called from within a Flow (sync or async method), this automatically
detects the event loop and returns a coroutine that the Flow framework
awaits. Users don't need to handle async explicitly.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
Messages can include a 'files' field with file inputs.
response_format: Optional Pydantic model for structured output.
input_files: Optional dict of named files to attach to the message.
Files can be paths, bytes, or File objects from crewai_files.
from_checkpoint: Optional checkpoint config. If ``restore_from``
is set, the agent resumes from that checkpoint. Remaining
config fields enable checkpointing for the run.
Returns:
LiteAgentOutput: The result of the agent execution.
When inside a Flow, returns a coroutine that resolves to LiteAgentOutput.
Note:
For explicit async usage outside of Flow, use kickoff_async() directly.
"""
restored = apply_checkpoint(self, from_checkpoint)
if restored is not None:
return restored.kickoff( # type: ignore[no-any-return]
messages=messages,
response_format=response_format,
input_files=input_files,
)
if is_inside_event_loop():
return self.kickoff_async(messages, response_format, input_files)
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format, input_files
)
try:
if self.checkpoint_kickoff_event_id is not None:
self._kickoff_event_id = self.checkpoint_kickoff_event_id
self.checkpoint_kickoff_event_id = None
else:
started_event = LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
)
crewai_event_bus.emit(self, event=started_event)
self._kickoff_event_id = started_event.event_id
output = self._execute_and_build_output(executor, inputs, response_format)
return self._finalize_kickoff(
output, executor, inputs, response_format, messages, agent_info
)
except Exception as e:
self._emit_kickoff_error(agent_info, e)
def _finalize_kickoff(
self,
output: LiteAgentOutput,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None,
messages: str | list[LLMMessage],
agent_info: dict[str, Any],
) -> LiteAgentOutput:
"""Apply guardrails, save to memory, and emit completion event.
Args:
output: The execution output.
executor: The agent executor.
inputs: The execution inputs.
response_format: Optional response format.
messages: The original messages.
agent_info: Agent metadata for events.
Returns:
The finalized output.
"""
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
self._save_kickoff_to_memory(messages, output.raw)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
def _emit_kickoff_error(self, agent_info: dict[str, Any], e: Exception) -> NoReturn:
"""Emit a kickoff error event and re-raise."""
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise e
def _save_kickoff_to_memory(
self, messages: str | list[LLMMessage], output_text: str
) -> None:
"""Save kickoff result to memory. No-op if agent has no memory."""
agent_memory = getattr(self, "memory", None)
if agent_memory is None:
return
try:
if isinstance(messages, str):
input_str = messages
else:
input_str = (
"\n".join(
str(msg.get("content", ""))
for msg in messages
if msg.get("content")
)
or "User request"
)
raw = f"Input: {input_str}\nAgent: {self.role}\nResult: {output_text}"
extracted = agent_memory.extract_memories(raw)
if extracted:
agent_memory.remember_many(extracted)
except Exception as e:
self._logger.log("error", f"Failed to save kickoff result to memory: {e}")
def _build_output_from_result(
self,
result: dict[str, Any],
executor: AgentExecutor,
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Build a LiteAgentOutput from an executor result dict.
Shared logic used by both sync and async execution paths.
Args:
result: The result dictionary from executor.invoke / invoke_async.
executor: The executor instance.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
output = result.get("output", "")
formatted_result: BaseModel | None = None
raw_output: str
if isinstance(output, BaseModel):
formatted_result = output
raw_output = output.model_dump_json()
elif response_format:
raw_output = str(output) if not isinstance(output, str) else output
try:
formatted_result = response_format.model_validate_json(raw_output)
except ValidationError:
# Direct JSON validation failed; fall back to converter-based parsing below.
formatted_result = None
if formatted_result is None:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = I18N_DEFAULT.slice(
"formatted_task_instructions"
).format(output_format=schema)
converter = Converter(
llm=cast(BaseLLM, self.llm),
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
# Conversion failure is non-fatal; raw output is preserved below.
pass
else:
raw_output = str(output) if not isinstance(output, str) else output
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
raw_str = (
raw_output
if isinstance(raw_output, str)
else raw_output.model_dump_json()
if isinstance(raw_output, BaseModel)
else str(raw_output)
)
todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items)
return LiteAgentOutput(
raw=raw_str,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=list(executor.state.messages),
plan=executor.state.plan,
todos=todo_results,
replan_count=executor.state.replan_count,
last_replan_reason=executor.state.last_replan_reason,
)
def _execute_and_build_output(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent synchronously and build the output object."""
result = cast(dict[str, Any], executor.invoke(inputs))
return self._build_output_from_result(result, executor, response_format)
async def _execute_and_build_output_async(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously and build the output object."""
result = await executor.invoke_async(inputs)
return self._build_output_from_result(result, executor, response_format)
def _process_kickoff_guardrail(
self,
output: LiteAgentOutput,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
retry_count: int = 0,
) -> LiteAgentOutput:
"""Process guardrail for kickoff execution with retry logic.
Args:
output: Current agent output.
executor: The executor instance.
inputs: Input dictionary for re-execution.
response_format: Optional response format.
retry_count: Current retry count.
Returns:
Validated/updated output.
"""
guardrail_callable: GuardrailCallable
if isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
guardrail_callable = cast(
GuardrailCallable,
LLMGuardrail(description=self.guardrail, llm=cast(BaseLLM, self.llm)),
)
elif callable(self.guardrail):
guardrail_callable = self.guardrail
else:
return output
guardrail_result = process_guardrail(
output=output,
guardrail=guardrail_callable,
retry_count=retry_count,
event_source=self,
from_agent=self,
)
if not guardrail_result.success:
if retry_count >= self.guardrail_max_retries:
raise ValueError(
f"Agent's guardrail failed validation after {self.guardrail_max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
executor._append_message_to_state(
guardrail_result.error or "Guardrail validation failed",
role="user",
)
output = self._execute_and_build_output(executor, inputs, response_format)
return self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
retry_count=retry_count + 1,
)
if guardrail_result.result is not None:
if isinstance(guardrail_result.result, str):
output.raw = guardrail_result.result
elif isinstance(guardrail_result.result, BaseModel):
output.pydantic = guardrail_result.result
return output
async def kickoff_async(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously with the given messages.
This is the async version of the kickoff method that uses native async
execution. It is designed for use within async contexts, such as when
called from within an async Flow method.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
Messages can include a 'files' field with file inputs.
response_format: Optional Pydantic model for structured output.
input_files: Optional dict of named files to attach to the message.
Files can be paths, bytes, or File objects from crewai_files.
from_checkpoint: Optional checkpoint config. If ``restore_from``
is set, the agent resumes from that checkpoint.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
restored = apply_checkpoint(self, from_checkpoint)
if restored is not None:
return await restored.kickoff_async( # type: ignore[no-any-return]
messages=messages,
response_format=response_format,
input_files=input_files,
)
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format, input_files
)
try:
if self.checkpoint_kickoff_event_id is not None:
self._kickoff_event_id = self.checkpoint_kickoff_event_id
self.checkpoint_kickoff_event_id = None
else:
started_event = LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
)
crewai_event_bus.emit(self, event=started_event)
self._kickoff_event_id = started_event.event_id
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)
return self._finalize_kickoff(
output, executor, inputs, response_format, messages, agent_info
)
except Exception as e:
self._emit_kickoff_error(agent_info, e)
async def akickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
) -> LiteAgentOutput:
"""Async version of kickoff. Alias for kickoff_async.
Args:
messages: Either a string query or a list of message dictionaries.
response_format: Optional Pydantic model for structured output.
input_files: Optional dict of named files to attach to the message.
from_checkpoint: Optional checkpoint config. If ``restore_from``
is set, the agent resumes from that checkpoint.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
return await self.kickoff_async(
messages, response_format, input_files, from_checkpoint
)