This commit is contained in:
lorenzejay
2026-05-26 09:15:37 -07:00
56 changed files with 720 additions and 447 deletions

View File

@@ -8,7 +8,6 @@ from crewai_cli.utils import copy_template
def add_crew_to_flow(crew_name: str) -> None:
"""Add a new crew to the current flow."""
# Check if pyproject.toml exists in the current directory
if not Path("pyproject.toml").exists():
PRINTER.print(
"This command must be run from the root of a flow project.", color="red"
@@ -17,7 +16,6 @@ def add_crew_to_flow(crew_name: str) -> None:
"This command must be run from the root of a flow project."
)
# Determine the flow folder based on the current directory
flow_folder = Path.cwd()
crews_folder = flow_folder / "src" / flow_folder.name / "crews"
@@ -25,7 +23,6 @@ def add_crew_to_flow(crew_name: str) -> None:
PRINTER.print("Crews folder does not exist in the current flow.", color="red")
raise click.ClickException("Crews folder does not exist in the current flow.")
# Create the crew within the flow's crews directory
create_embedded_crew(crew_name, parent_folder=crews_folder)
click.echo(
@@ -51,13 +48,12 @@ def create_embedded_crew(crew_name: str, parent_folder: Path) -> None:
click.secho(f"Creating crew {folder_name}...", fg="green", bold=True)
crew_folder.mkdir(parents=True)
# Create config and crew.py files
config_folder = crew_folder / "config"
config_folder.mkdir(exist_ok=True)
templates_dir = Path(__file__).parent / "templates" / "crew"
config_template_files = ["agents.yaml", "tasks.yaml"]
crew_template_file = f"{folder_name}.py" # Updated file name
crew_template_file = f"{folder_name}.py"
for file_name in config_template_files:
src_file = templates_dir / "config" / file_name

View File

@@ -222,9 +222,6 @@ def _entity_summary(entities: list[dict[str, Any]]) -> str:
return ", ".join(parts) if parts else "empty"
# --- JSON directory ---
def _list_json(location: str) -> list[dict[str, Any]]:
pattern = os.path.join(location, "**", "*.json")
results = []
@@ -275,9 +272,6 @@ def _info_json_file(path: str) -> dict[str, Any]:
return meta
# --- SQLite ---
def _list_sqlite(db_path: str) -> list[dict[str, Any]]:
results = []
with sqlite3.connect(db_path) as conn:
@@ -327,9 +321,6 @@ def _info_sqlite_id(db_path: str, checkpoint_id: str) -> dict[str, Any] | None:
return meta
# --- Public API ---
def list_checkpoints(location: str) -> None:
"""List all checkpoints at a location."""
if _is_sqlite(location):
@@ -367,7 +358,6 @@ def info_checkpoint(path: str) -> None:
"""Show details of a single checkpoint."""
meta: dict[str, Any] | None = None
# db_path#checkpoint_id format
if "#" in path:
db_path, checkpoint_id = path.rsplit("#", 1)
if _is_sqlite(db_path):
@@ -376,7 +366,6 @@ def info_checkpoint(path: str) -> None:
click.echo(f"Checkpoint not found: {checkpoint_id}")
return
# SQLite file — show latest
if meta is None and _is_sqlite(path):
meta = _info_sqlite_latest(path)
if not meta:
@@ -384,7 +373,6 @@ def info_checkpoint(path: str) -> None:
return
click.echo(f"Latest checkpoint: {meta['name']}\n")
# Directory — show latest JSON
if meta is None and os.path.isdir(path):
meta = _info_json_latest(path)
if not meta:
@@ -392,7 +380,6 @@ def info_checkpoint(path: str) -> None:
return
click.echo(f"Latest checkpoint: {meta['name']}\n")
# Specific JSON file
if meta is None and os.path.isfile(path):
try:
meta = _info_json_file(path)

View File

@@ -320,8 +320,6 @@ class CheckpointTUI(App[_TuiResult]):
self._refresh_tree()
self.query_one("#tree-panel", Tree).root.expand()
# ── Tree building ──────────────────────────────────────────────
@staticmethod
def _top_level_entity(entry: dict[str, Any]) -> tuple[str, str]:
etype, ename = "unknown", ""
@@ -473,8 +471,6 @@ class CheckpointTUI(App[_TuiResult]):
self.sub_title = self._location
self.query_one("#status", Static).update(f" {count} checkpoint(s) | {storage}")
# ── Detail panel ───────────────────────────────────────────────
async def _clear_scroll(self, tab_id: str) -> VerticalScroll:
tab = self.query_one(f"#{tab_id}", TabPane)
scroll = tab.query_one(VerticalScroll)
@@ -661,8 +657,6 @@ class CheckpointTUI(App[_TuiResult]):
)
await scroll.mount(row)
# ── Data collection ────────────────────────────────────────────
def _collect_inputs(self) -> dict[str, Any] | None:
if not self._input_keys:
return None
@@ -699,8 +693,6 @@ class CheckpointTUI(App[_TuiResult]):
return f"{self._location}#{entry['name']}"
return str(entry.get("name", ""))
# ── Events ─────────────────────────────────────────────────────
async def on_tree_node_highlighted(
self, event: Tree.NodeHighlighted[dict[str, Any]]
) -> None:

View File

@@ -42,7 +42,6 @@ from crewai_cli.utils import build_env_with_all_tool_credentials, read_toml
def _get_cli_version() -> str:
"""Return the best available version string for the CLI."""
# Prefer crewai version if installed (keeps existing UX)
try:
return get_version("crewai")
except Exception: # noqa: S110
@@ -67,7 +66,6 @@ def crewai() -> None:
def uv(uv_args: tuple[str, ...]) -> None:
"""A wrapper around uv commands that adds custom tool authentication through env vars."""
try:
# Verify pyproject.toml exists first
read_toml()
except FileNotFoundError as e:
raise SystemExit(
@@ -321,7 +319,6 @@ def memory(
)
raise SystemExit(1) from exc
# Build embedder spec from CLI flags.
embedder_spec: dict[str, Any] | None = None
if embedder_config:
import json as _json
@@ -435,7 +432,6 @@ def logout(reset: bool) -> None:
click.echo("Successfully logged out from CrewAI AMP.")
# DEPLOY CREWAI+ COMMANDS
@crewai.group()
def deploy() -> None:
"""Deploy the Crew CLI group."""
@@ -766,17 +762,14 @@ def env_view() -> None:
console = Console()
# Check for .env file
env_file = Path(".env")
env_file_exists = env_file.exists()
# Create table for environment variables
table = Table(show_header=True, header_style="bold cyan", expand=True)
table.add_column("Environment Variable", style="cyan", width=30)
table.add_column("Value", style="white", width=20)
table.add_column("Source", style="yellow", width=20)
# Check CREWAI_TRACING_ENABLED
crewai_tracing = os.getenv("CREWAI_TRACING_ENABLED", "")
if crewai_tracing:
table.add_row(
@@ -791,7 +784,6 @@ def env_view() -> None:
"[dim]—[/dim]",
)
# Check other related env vars
crewai_testing = os.getenv("CREWAI_TESTING", "")
if crewai_testing:
table.add_row("CREWAI_TESTING", crewai_testing, "Environment/Shell")
@@ -804,7 +796,6 @@ def env_view() -> None:
if crewai_org_id:
table.add_row("CREWAI_ORG_ID", crewai_org_id, "Environment/Shell")
# Check if .env file exists
table.add_row(
".env file",
"✅ Found" if env_file_exists else "❌ Not found",
@@ -820,7 +811,6 @@ def env_view() -> None:
console.print("\n")
console.print(panel)
# Show helpful message
if env_file_exists:
console.print(
"\n[dim]💡 Tip: To enable tracing via .env, add: CREWAI_TRACING_ENABLED=true[/dim]"
@@ -896,11 +886,9 @@ def traces_status() -> None:
table.add_column("Setting", style="cyan")
table.add_column("Value", style="white")
# Check environment variable
env_enabled = os.getenv("CREWAI_TRACING_ENABLED", "false")
table.add_row("CREWAI_TRACING_ENABLED", env_enabled)
# Check user consent
trace_consent = user_data.get("trace_consent")
if trace_consent is True:
consent_status = "✅ Enabled (user consented)"
@@ -910,7 +898,6 @@ def traces_status() -> None:
consent_status = "⚪ Not set (first-time user)"
table.add_row("User Consent", consent_status)
# Check overall status
if is_tracing_enabled():
overall_status = "✅ ENABLED"
border_style = "green"

View File

@@ -50,7 +50,6 @@ def create_folder_structure(
folder_name = name.replace(" ", "_").replace("-", "_").lower()
folder_name = re.sub(r"[^a-zA-Z0-9_]", "", folder_name)
# Check if the name starts with invalid characters or is primarily invalid
if re.match(r"^[^a-zA-Z0-9_-]+", name):
raise ValueError(
f"Project name '{name}' contains no valid characters for a Python module name"
@@ -98,7 +97,6 @@ def create_folder_structure(
f"Project name '{name}' would generate class name '{class_name}' which cannot start with a digit"
)
# Check if the original name (before title casing) is a keyword
original_name_clean = re.sub(
r"[^a-zA-Z0-9_]", "", name.replace("_", "").replace("-", "").lower()
)
@@ -128,7 +126,7 @@ def create_folder_structure(
click.secho("Operation cancelled.", fg="yellow")
sys.exit(0)
click.secho(f"Overriding folder {folder_name}...", fg="green", bold=True)
shutil.rmtree(folder_path) # Delete the existing folder and its contents
shutil.rmtree(folder_path)
click.secho(
f"Creating {'crew' if parent_folder else 'folder'} {folder_name}...",
@@ -144,7 +142,6 @@ def create_folder_structure(
(folder_path / "src" / folder_name / "tools").mkdir(parents=True)
(folder_path / "src" / folder_name / "config").mkdir(parents=True)
# Copy AGENTS.md to project root (top-level projects only)
package_dir = Path(__file__).parent
agents_md_src = package_dir / "templates" / "AGENTS.md"
if agents_md_src.exists():
@@ -232,25 +229,22 @@ def create_crew(
while True:
selected_provider = select_provider(provider_models)
if selected_provider is None: # User typed 'q'
if selected_provider is None:
click.secho("Exiting...", fg="yellow")
sys.exit(0)
if selected_provider and isinstance(
selected_provider, str
): # Valid selection
if selected_provider and isinstance(selected_provider, str):
break
click.secho(
"No provider selected. Please try again or press 'q' to exit.", fg="red"
)
# Check if the selected provider has predefined models
if MODELS.get(selected_provider):
while True:
selected_model = select_model(selected_provider, provider_models)
if selected_model is None: # User typed 'q'
if selected_model is None:
click.secho("Exiting...", fg="yellow")
sys.exit(0)
if selected_model: # Valid selection
if selected_model:
break
click.secho(
"No model selected. Please try again or press 'q' to exit.",
@@ -258,17 +252,14 @@ def create_crew(
)
env_vars["MODEL"] = selected_model
# Check if the selected provider requires API keys
if selected_provider in ENV_VARS:
provider_env_vars = ENV_VARS[selected_provider]
for details in provider_env_vars:
if details.get("default", False):
# Automatically add default key-value pairs
for key, value in details.items():
if key not in ["prompt", "key_name", "default"]:
env_vars[key] = value
elif "key_name" in details:
# Prompt for non-default key-value pairs
prompt = details["prompt"]
key_name = details["key_name"]
api_key_value = click.prompt(prompt, default="", show_default=False)

View File

@@ -20,25 +20,21 @@ def create_flow(name: str) -> None:
telemetry = Telemetry()
telemetry.flow_creation_span(class_name)
# Create directory structure
(project_root / "src" / folder_name).mkdir(parents=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True)
(project_root / "src" / folder_name / "tools").mkdir(parents=True)
(project_root / "tests").mkdir(exist_ok=True)
# Create .env file
with open(project_root / ".env", "w") as file:
file.write("OPENAI_API_KEY=YOUR_API_KEY")
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "flow"
# Copy AGENTS.md to project root
agents_md_src = package_dir / "templates" / "AGENTS.md"
if agents_md_src.exists():
shutil.copy2(agents_md_src, project_root / "AGENTS.md")
# List of template files to copy
root_template_files = [".gitignore", "pyproject.toml", "README.md"]
src_template_files = ["__init__.py", "main.py"]
tools_template_files = ["tools/__init__.py", "tools/custom_tool.py"]
@@ -65,25 +61,21 @@ def create_flow(name: str) -> None:
with open(dst_file, "w") as file:
file.write(content)
# Copy and process root template files
for file_name in root_template_files:
src_file = templates_dir / file_name
dst_file = project_root / file_name
process_file(src_file, dst_file)
# Copy and process src template files
for file_name in src_template_files:
src_file = templates_dir / file_name
dst_file = project_root / "src" / folder_name / file_name
process_file(src_file, dst_file)
# Copy tools files
for file_name in tools_template_files:
src_file = templates_dir / file_name
dst_file = project_root / "src" / folder_name / file_name
process_file(src_file, dst_file)
# Copy crew folders
for crew_folder in crew_folders:
src_crew_folder = templates_dir / "crews" / crew_folder
dst_crew_folder = project_root / "src" / folder_name / "crews" / crew_folder

View File

@@ -74,7 +74,6 @@ class ValidationResult:
hint: str = ""
# Maps known provider env var names → label used in hint messages.
_KNOWN_API_KEY_HINTS: dict[str, str] = {
"OPENAI_API_KEY": "OpenAI",
"ANTHROPIC_API_KEY": "Anthropic",

View File

@@ -10,10 +10,9 @@ from textual.containers import Horizontal, Vertical
from textual.widgets import Footer, Header, Input, OptionList, Static, Tree
# -- CrewAI brand palette --
_PRIMARY = "#eb6658" # coral
_SECONDARY = "#1F7982" # teal
_TERTIARY = "#ffffff" # white
_PRIMARY = "#eb6658"
_SECONDARY = "#1F7982"
_TERTIARY = "#ffffff"
def _format_scope_info(info: Any) -> str:
@@ -193,8 +192,6 @@ class MemoryTUI(App[None]):
node = parent_node.add(label, data=child)
self._add_scope_children(node, child, depth + 1, max_depth)
# -- Populating the OptionList -------------------------------------------
def _populate_entry_list(self) -> None:
"""Clear the OptionList and fill it with the current scope's entries."""
option_list = self.query_one("#entry-list", OptionList)
@@ -226,8 +223,6 @@ class MemoryTUI(App[None]):
)
option_list.add_option(label)
# -- Detail rendering ----------------------------------------------------
def _format_record_detail(self, record: Any, context_line: str = "") -> str:
"""Format a full MemoryRecord as Rich markup for the detail view.
@@ -246,7 +241,6 @@ class MemoryTUI(App[None]):
lines.append(context_line)
lines.append("")
# -- Fields block --
lines.append(f"[dim]ID:[/] {record.id}")
lines.append(f"[dim]Scope:[/] [bold]{record.scope}[/]")
lines.append(f"[dim]Importance:[/] [bold]{record.importance:.2f}[/]")
@@ -264,12 +258,10 @@ class MemoryTUI(App[None]):
lines.append(f"[dim]Source:[/] {record.source or '-'}")
lines.append(f"[dim]Private:[/] {'Yes' if record.private else 'No'}")
# -- Content block --
lines.append(f"\n{sep}")
lines.append("[bold]Content[/]\n")
lines.append(record.content)
# -- Metadata block --
if record.metadata:
lines.append(f"\n{sep}")
lines.append("[bold]Metadata[/]\n")
@@ -278,8 +270,6 @@ class MemoryTUI(App[None]):
return "\n".join(lines)
# -- Event handlers ------------------------------------------------------
def on_tree_node_selected(self, event: Tree.NodeSelected[str]) -> None:
"""Load entries for the selected scope and populate the OptionList."""
path = event.node.data if event.node.data is not None else "/"

View File

@@ -68,12 +68,12 @@ def select_provider(provider_models: dict[str, list[str]]) -> str | None | bool:
provider = select_choice(
"Select a provider to set up:", [*predefined_providers, "other"]
)
if provider is None: # User typed 'q'
if provider is None:
return None
if provider == "other":
provider = select_choice("Select a provider from the full list:", all_providers)
if provider is None: # User typed 'q'
if provider is None:
return None
return provider.lower() if provider else False

View File

@@ -31,7 +31,6 @@ def run_crew(trained_agents_file: str | None = None) -> None:
min_required_version = "0.71.0"
pyproject_data = read_toml()
# Check for legacy poetry configuration
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
@@ -41,14 +40,11 @@ def run_crew(trained_agents_file: str | None = None) -> None:
fg="red",
)
# Determine crew type
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
# Display appropriate message
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
# Execute the appropriate command
execute_command(crew_type, trained_agents_file=trained_agents_file)

View File

@@ -28,10 +28,8 @@ class SettingsCommand(BaseCommand):
table.add_column("Value", style="green")
table.add_column("Description", style="yellow")
# Add all settings to the table
for field_name, field_info in Settings.model_fields.items():
if field_name in HIDDEN_SETTINGS_KEYS:
# Do not display hidden settings
continue
current_value = getattr(self.settings, field_name)
@@ -42,10 +40,8 @@ class SettingsCommand(BaseCommand):
table.add_row(field_name, display_value, description)
# Add trace-related settings from user data
user_data = _load_user_data()
# CREWAI_TRACING_ENABLED environment variable
env_tracing = os.getenv("CREWAI_TRACING_ENABLED", "")
env_tracing_display = env_tracing if env_tracing else "Not set"
table.add_row(
@@ -54,7 +50,6 @@ class SettingsCommand(BaseCommand):
"Environment variable to enable/disable tracing",
)
# Trace consent status
trace_consent = user_data.get("trace_consent")
if trace_consent is True:
consent_display = "✅ Enabled"
@@ -66,7 +61,6 @@ class SettingsCommand(BaseCommand):
"trace_consent", consent_display, "Whether trace collection is enabled"
)
# First execution timestamp
if user_data.get("first_execution_at"):
timestamp = datetime.fromtimestamp(user_data["first_execution_at"])
first_exec_display = timestamp.strftime("%Y-%m-%d %H:%M:%S")

View File

@@ -41,10 +41,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
BaseCommand.__init__(self)
PlusAPIMixin.__init__(self, telemetry=self._telemetry)
# ------------------------------------------------------------------
# create
# ------------------------------------------------------------------
def create(self, name: str, in_project: bool = True) -> None:
"""Scaffold a new skill directory.
@@ -73,10 +69,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
)
console.print(f"Edit [bold]{skill_md}[/bold] to define the skill instructions.")
# ------------------------------------------------------------------
# install
# ------------------------------------------------------------------
def install(self, ref: str) -> None:
"""Download and install a registry skill.
@@ -182,10 +174,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
f"[green]Installed [bold]{ref}[/bold]{' (' + version + ')' if version else ''} to global cache.[/green]"
)
# ------------------------------------------------------------------
# publish
# ------------------------------------------------------------------
def publish(self, is_public: bool, org: str | None, force: bool = False) -> None:
"""Publish the skill in the current directory to the registry."""
skill_md = Path("SKILL.md")
@@ -196,7 +184,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
)
raise SystemExit(1)
# Parse frontmatter to extract name + version
try:
frontmatter = self._parse_frontmatter(skill_md.read_text())
except ValueError as exc:
@@ -257,10 +244,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
f"Monitor status at: {base_url}/crewai_plus/skills/{effective_org}/{name}[/green]"
)
# ------------------------------------------------------------------
# list_cached
# ------------------------------------------------------------------
def list_cached(self) -> None:
"""Show locally installed skills."""
table = Table(title="Installed Skills", show_lines=True)
@@ -269,7 +252,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
table.add_column("Version")
table.add_column("Path")
# Project-local ./skills/
local_skills_dir = Path("skills")
if local_skills_dir.is_dir():
for skill_dir in sorted(local_skills_dir.iterdir()):
@@ -282,7 +264,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
str(skill_dir),
)
# Global cache
cache_root = Path.home() / ".crewai" / "skills"
if cache_root.exists():
for org_dir in sorted(cache_root.iterdir()):
@@ -306,10 +287,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
console.print(table)
# ------------------------------------------------------------------
# internal helpers
# ------------------------------------------------------------------
def _print_current_organization(self) -> None:
settings = Settings()
if settings.org_uuid:
@@ -326,7 +303,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
def _unpack_archive(self, archive_bytes: bytes, dest: Path) -> None:
"""Unpack a .tar.gz or .zip archive into dest."""
# Try tar first, then zip
try:
with tarfile.open(fileobj=io.BytesIO(archive_bytes), mode="r:gz") as tf:
try:
@@ -337,7 +313,6 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
except tarfile.TarError:
pass
# Fallback: zip
with zipfile.ZipFile(io.BytesIO(archive_bytes)) as zf:
_safe_extract_zip(zf, dest)

View File

@@ -1,9 +1,7 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class {{crew_name}}():
@@ -12,12 +10,6 @@ class {{crew_name}}():
agents: list[BaseAgent]
tasks: list[Task]
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
return Agent(
@@ -32,9 +24,6 @@ class {{crew_name}}():
verbose=True
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
@@ -51,13 +40,9 @@ class {{crew_name}}():
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

View File

@@ -8,10 +8,6 @@ from {{folder_name}}.crew import {{crew_name}}
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
# This main file is intended to be a way for you to run your
# crew locally, so refrain from adding unnecessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information
def run():
"""

View File

@@ -15,5 +15,4 @@ class MyCustomTool(BaseTool):
args_schema: Type[BaseModel] = MyCustomToolInput
def _run(self, argument: str) -> str:
# Implementation goes here
return "this is an example of a tool output, ignore it and move along."

View File

@@ -2,10 +2,6 @@ from crewai import Agent, Crew, Process, Task
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.project import CrewBase, agent, crew, task
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class ContentCrew:
@@ -14,14 +10,9 @@ class ContentCrew:
agents: list[BaseAgent]
tasks: list[Task]
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
# If you would like to add tools to your crew, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def planner(self) -> Agent:
return Agent(
@@ -40,9 +31,6 @@ class ContentCrew:
config=self.agents_config["editor"], # type: ignore[index]
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def planning_task(self) -> Task:
return Task(
@@ -64,12 +52,9 @@ class ContentCrew:
@crew
def crew(self) -> Crew:
"""Creates the Content Crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
)

View File

@@ -68,7 +68,6 @@ def run_with_trigger():
import json
import sys
# Get trigger payload from command line argument
if len(sys.argv) < 2:
raise Exception("No trigger payload provided. Please provide JSON payload as argument.")
@@ -77,8 +76,6 @@ def run_with_trigger():
except json.JSONDecodeError:
raise Exception("Invalid JSON payload provided as argument")
# Create flow and kickoff with trigger payload
# The @start() methods will automatically receive crewai_trigger_payload parameter
content_flow = ContentFlow()
try:

View File

@@ -17,5 +17,4 @@ class MyCustomTool(BaseTool):
args_schema: Type[BaseModel] = MyCustomToolInput
def _run(self, argument: str) -> str:
# Implementation goes here
return "this is an example of a tool output, ignore it and move along."

View File

@@ -6,5 +6,4 @@ class {{class_name}}(BaseTool):
description: str = "What this tool does. It's vital for effective utilization."
def _run(self, argument: str) -> str:
# Your tool's logic here
return "Tool's result"

View File

@@ -82,7 +82,6 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
tree_find_and_replace(project_root, "{{folder_name}}", folder_name)
tree_find_and_replace(project_root, "{{class_name}}", class_name)
# Copy AGENTS.md to project root
agents_md_src = Path(__file__).parent.parent / "templates" / "AGENTS.md"
if agents_md_src.exists():
shutil.copy2(agents_md_src, project_root / "AGENTS.md")

View File

@@ -37,7 +37,6 @@ class TriggersCommand(BaseCommand, PlusAPIMixin):
def execute_with_trigger(self, trigger_path: str) -> None:
"""Execute crew with trigger payload."""
try:
# Parse app_slug/trigger_slug
if "/" not in trigger_path:
console.print(
"[bold red]Error: Trigger must be in format 'app_slug/trigger_slug'[/bold red]"
@@ -63,7 +62,6 @@ class TriggersCommand(BaseCommand, PlusAPIMixin):
trigger_data = response.json()
self._display_trigger_info(trigger_data)
# Run crew with trigger payload
self._run_crew_with_payload(trigger_data.get("sample_payload", {}))
except Exception as e:

View File

@@ -21,7 +21,6 @@ def migrate_pyproject(input_file: str, output_file: str) -> None:
When the time comes that uv supports the new format, this function will be deprecated.
"""
poetry_data = {}
# Read the input pyproject.toml
pyproject_data = read_toml()
new_pyproject: dict[str, Any] = {
@@ -29,7 +28,6 @@ def migrate_pyproject(input_file: str, output_file: str) -> None:
"build-system": {"requires": ["hatchling"], "build-backend": "hatchling.build"},
}
# Migrate project metadata
if "tool" in pyproject_data and "poetry" in pyproject_data["tool"]:
poetry_data = pyproject_data["tool"]["poetry"]
new_pyproject["project"]["name"] = poetry_data.get("name")
@@ -44,18 +42,15 @@ def migrate_pyproject(input_file: str, output_file: str) -> None:
]
new_pyproject["project"]["requires-python"] = poetry_data.get("python")
else:
# If it's already in the new format, just copy the project and tool sections
new_pyproject["project"] = pyproject_data.get("project", {})
new_pyproject["tool"] = pyproject_data.get("tool", {})
# Migrate or copy dependencies
if "dependencies" in new_pyproject["project"]:
# If dependencies are already in the new format, keep them as is
pass
elif poetry_data and "dependencies" in poetry_data:
new_pyproject["project"]["dependencies"] = []
for dep, version in poetry_data["dependencies"].items():
if isinstance(version, dict): # Handle extras
if isinstance(version, dict):
extras = ",".join(version.get("extras", []))
new_dep = f"{dep}[{extras}]"
if "version" in version:
@@ -67,7 +62,6 @@ def migrate_pyproject(input_file: str, output_file: str) -> None:
new_dep = f"{dep}{parse_version(version)}"
new_pyproject["project"]["dependencies"].append(new_dep)
# Migrate or copy scripts
if poetry_data and "scripts" in poetry_data:
new_pyproject["project"]["scripts"] = poetry_data["scripts"]
elif pyproject_data.get("project", {}) and "scripts" in pyproject_data["project"]:
@@ -79,7 +73,6 @@ def migrate_pyproject(input_file: str, output_file: str) -> None:
"run_crew" not in new_pyproject["project"]["scripts"]
and len(new_pyproject["project"]["scripts"]) > 0
):
# Extract the module name from any existing script
existing_scripts = new_pyproject["project"]["scripts"]
module_name = next(
(value.split(".")[0] for value in existing_scripts.values() if "." in value)
@@ -87,15 +80,12 @@ def migrate_pyproject(input_file: str, output_file: str) -> None:
new_pyproject["project"]["scripts"]["run_crew"] = f"{module_name}.main:run"
# Migrate optional dependencies
if poetry_data and "extras" in poetry_data:
new_pyproject["project"]["optional-dependencies"] = poetry_data["extras"]
# Backup the old pyproject.toml
backup_file = "pyproject-old.toml"
shutil.copy2(input_file, backup_file)
# Rename the poetry.lock file
lock_file = "poetry.lock"
lock_backup = "poetry-old.lock"
if os.path.exists(lock_file):
@@ -103,7 +93,6 @@ def migrate_pyproject(input_file: str, output_file: str) -> None:
else:
pass
# Write the new pyproject.toml
with open(output_file, "wb") as f:
tomli_w.dump(new_pyproject, f)

View File

@@ -333,7 +333,6 @@ class TestAuthenticationCommand:
@patch("crewai_core.auth.oauth2.httpx.post")
def test_poll_for_token_error(self, mock_post):
"""Test the method to poll for token (error path)."""
# Setup mock to return error
mock_response_error = MagicMock()
mock_response_error.status_code = 400
mock_response_error.json.return_value = {

View File

@@ -12,7 +12,6 @@ class TestUtils(unittest.TestCase):
def test_validate_jwt_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.return_value = {"exp": 1719859200}
# Create signing key object mock with a .key attribute
mock_pyjwkclient.return_value.get_signing_key_from_jwt.return_value = MagicMock(
key="mock_signing_key"
)

View File

@@ -164,7 +164,6 @@ def test_poetry_lock_is_accepted(tmp_path: Path) -> None:
def test_stale_lockfile_warns(tmp_path: Path) -> None:
_scaffold_standard_crew(tmp_path)
# Make lockfile older than pyproject.
lock = tmp_path / "uv.lock"
pyproject = tmp_path / "pyproject.toml"
old_time = pyproject.stat().st_mtime - 60

View File

@@ -41,14 +41,9 @@ def skill_command():
yield cmd
# ---------------------------------------------------------------------------
# create
# ---------------------------------------------------------------------------
class TestSkillCreate:
def test_create_in_project(self, skill_command, tmp_path):
with in_temp_dir():
# Simulate being inside a project
Path("pyproject.toml").write_text("[tool.poetry]\nname = 'test'\n")
skill_command.create("my-skill")
assert Path("skills/my-skill/SKILL.md").exists()
@@ -75,10 +70,6 @@ class TestSkillCreate:
skill_command.create("existing-skill", in_project=False)
# ---------------------------------------------------------------------------
# install
# ---------------------------------------------------------------------------
class TestSkillInstall:
def _zip_skill(self, name: str) -> bytes:
buf = io.BytesIO()
@@ -118,10 +109,6 @@ class TestSkillInstall:
assert Path("skills/my-skill/SKILL.md").exists()
# ---------------------------------------------------------------------------
# publish
# ---------------------------------------------------------------------------
class TestSkillPublish:
def test_publish_no_skill_md(self, skill_command):
with in_temp_dir():
@@ -155,7 +142,6 @@ class TestSkillPublish:
mock_resp.status_code = 200
mock_resp.json.return_value = {}
mock_client.publish_skill.return_value = mock_resp
# No org set → should SystemExit (no org_name in settings)
with patch("crewai_cli.skills.main.Settings") as mock_settings_cls:
mock_settings_cls.return_value.org_name = None
mock_settings_cls.return_value.enterprise_base_url = None
@@ -184,15 +170,10 @@ class TestSkillPublish:
assert call_kwargs.kwargs["version"] == "1.0.0"
# ---------------------------------------------------------------------------
# list_cached
# ---------------------------------------------------------------------------
class TestSkillListCached:
def test_list_cached_empty(self, skill_command, capsys):
with in_temp_dir():
skill_command.list_cached()
# Should not raise
def test_list_cached_shows_project_skills(self, skill_command, capsys):
with in_temp_dir():
@@ -202,4 +183,3 @@ class TestSkillListCached:
"---\nname: my-skill\nversion: 0.5.0\ndescription: A skill.\n---\nBody."
)
skill_command.list_cached()
# Should complete without error

View File

@@ -83,7 +83,6 @@ def test_test_crew_called_process_error(mock_subprocess_run, click):
@mock.patch("crewai_cli.evaluate_crew.click")
@mock.patch("crewai_cli.evaluate_crew.subprocess.run")
def test_test_crew_unexpected_exception(mock_subprocess_run, click):
# Arrange
n_iterations = 5
mock_subprocess_run.side_effect = Exception("Unexpected error")
evaluate_crew.evaluate_crew(n_iterations, "gpt-4o")

View File

@@ -35,7 +35,6 @@ class TestSettingsCommand(unittest.TestCase):
self.settings_command.list()
# Tests that the table is created skipping hidden settings
mock_table_instance.add_row.assert_has_calls(
[
call(
@@ -48,7 +47,6 @@ class TestSettingsCommand(unittest.TestCase):
]
)
# Tests that the table is printed
mock_console.print.assert_called_once_with(mock_table_instance)
def test_set_valid_keys(self):

View File

@@ -146,7 +146,6 @@ class TestAtomicFileOperations(unittest.TestCase):
self.temp_dir = tempfile.mkdtemp()
self.original_get_path = TokenManager._get_secure_storage_path
# Patch to use temp directory
def mock_get_path() -> Path:
return Path(self.temp_dir)
@@ -182,7 +181,6 @@ class TestAtomicFileOperations(unittest.TestCase):
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Create file first
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"original")
@@ -231,7 +229,6 @@ class TestAtomicFileOperations(unittest.TestCase):
tm._atomic_write_secure_file("test.txt", b"content")
# Check no temp files remain
temp_files = list(Path(self.temp_dir).glob(".test.txt.*"))
self.assertEqual(len(temp_files), 0)
@@ -285,7 +282,6 @@ class TestAtomicFileOperations(unittest.TestCase):
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Should not raise
tm._delete_secure_file("nonexistent.txt")

View File

@@ -27,9 +27,7 @@ def in_temp_dir():
@pytest.fixture
def tool_command():
# Create a temporary directory for each test to avoid token storage conflicts
with tempfile.TemporaryDirectory() as temp_dir:
# Mock the secure storage path to use the temp directory
with patch.object(
TokenManager, "_get_secure_storage_path", return_value=Path(temp_dir)
):
@@ -97,7 +95,6 @@ def test_install_success(
env=unittest.mock.ANY,
)
# Verify _print_current_organization was called
mock_print_org.assert_called_once()

View File

@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
import os
from typing import Any, Literal, cast
from typing import Annotated, Any, Literal, cast
from crewai.rag.core.base_embeddings_callable import EmbeddingFunction
from crewai.rag.embeddings.factory import build_embedder
@@ -8,10 +8,13 @@ from crewai.rag.embeddings.types import ProviderSpec
from crewai.tools import BaseTool
from pydantic import (
BaseModel,
BeforeValidator,
ConfigDict,
Field,
PlainSerializer,
TypeAdapter,
ValidationError,
WithJsonSchema,
field_validator,
model_validator,
)
@@ -100,6 +103,26 @@ class Adapter(BaseModel, ABC):
"""Add content to the knowledge base."""
def _resolve_adapter(value: Any) -> Any:
"""Validate the ``adapter`` field, returning a placeholder for dict/None input.
Adapter state is not round-tripped; the ``_ensure_adapter`` post-validator
rebuilds a fresh adapter from the tool's ``config``.
"""
if isinstance(value, Adapter):
return value
if value is None or isinstance(value, dict):
return RagTool._AdapterPlaceholder()
return value
def _serialize_adapter(adapter: Any, info: Any) -> Any:
"""Serialize the ``adapter`` field, dropping runtime state from the payload."""
if not isinstance(adapter, Adapter):
return adapter
return None
class RagTool(BaseTool):
class _AdapterPlaceholder(Adapter):
def query(
@@ -123,7 +146,12 @@ class RagTool(BaseTool):
similarity_threshold: float = 0.6
limit: int = 5
collection_name: str = "rag_tool_collection"
adapter: Adapter = Field(default_factory=_AdapterPlaceholder)
adapter: Annotated[
Adapter,
BeforeValidator(_resolve_adapter),
PlainSerializer(_serialize_adapter, when_used="json"),
WithJsonSchema({"type": ["object", "null"]}),
] = Field(default_factory=_AdapterPlaceholder)
config: RagToolConfig = Field(
default_factory=RagToolConfig,
description="Configuration format accepted by RagTool.",

View File

@@ -2912,12 +2912,6 @@
"humanized_name": "Search a CSV's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -3903,7 +3897,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -3964,12 +3961,6 @@
"humanized_name": "Search a Code Docs content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -4955,7 +4946,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -5641,12 +5635,6 @@
"humanized_name": "Search a DOCX's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -6632,7 +6620,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -7926,12 +7917,6 @@
"humanized_name": "Search a directory's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -8917,7 +8902,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -10762,12 +10750,6 @@
"humanized_name": "Search a github repo's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -11753,7 +11735,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -12041,12 +12026,6 @@
"humanized_name": "Search a JSON's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -13032,7 +13011,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -13316,12 +13298,6 @@
"humanized_name": "Search a MDX's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -14307,7 +14283,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -14774,12 +14753,6 @@
"humanized_name": "Search a database's table content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -15765,7 +15738,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -15967,21 +15943,6 @@
"title": "EnvVar",
"type": "object"
},
"JsonResponseFormat": {
"description": "Response format requesting raw JSON output (e.g. ``{\"type\": \"json_object\"}``).",
"properties": {
"type": {
"const": "json_object",
"title": "Type",
"type": "string"
}
},
"required": [
"type"
],
"title": "JsonResponseFormat",
"type": "object"
},
"LLM": {
"properties": {
"additional_params": {
@@ -16210,16 +16171,6 @@
"title": "Reasoning Effort"
},
"response_format": {
"anyOf": [
{
"$ref": "#/$defs/JsonResponseFormat"
},
{},
{
"type": "null"
}
],
"default": null,
"title": "Response Format"
},
"seed": {
@@ -17207,12 +17158,6 @@
"humanized_name": "Search a PDF's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -18198,7 +18143,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -18906,12 +18854,6 @@
"humanized_name": "Knowledge base",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -19897,7 +19839,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -20994,12 +20939,6 @@
"humanized_name": "Job Search",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -21985,7 +21924,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -22462,12 +22404,6 @@
"humanized_name": "Webpage to Markdown",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -23453,7 +23389,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -24307,12 +24246,6 @@
"humanized_name": "Search a txt's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -25298,7 +25231,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -26227,12 +26163,6 @@
"humanized_name": "Search in a specific website",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -27218,7 +27148,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -27279,12 +27212,6 @@
"humanized_name": "Search a XML's content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -28270,7 +28197,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -28331,12 +28261,6 @@
"humanized_name": "Search a Youtube Channels content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -29322,7 +29246,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",
@@ -29383,12 +29310,6 @@
"humanized_name": "Search a Youtube Video content",
"init_params_schema": {
"$defs": {
"Adapter": {
"description": "Abstract base class for RAG adapters.",
"properties": {},
"title": "Adapter",
"type": "object"
},
"AzureProviderConfig": {
"description": "Configuration for Azure provider.",
"properties": {
@@ -30374,7 +30295,10 @@
},
"properties": {
"adapter": {
"$ref": "#/$defs/Adapter"
"type": [
"object",
"null"
]
},
"collection_name": {
"default": "rag_tool_collection",

View File

@@ -16,6 +16,7 @@ from pydantic import (
FilePath,
PrivateAttr,
SecretStr,
field_serializer,
model_validator,
)
from typing_extensions import Self, deprecated
@@ -24,6 +25,7 @@ from crewai.a2a.auth.client_schemes import ClientAuthScheme
from crewai.a2a.auth.server_schemes import ServerAuthScheme
from crewai.a2a.extensions.base import ValidatedA2AExtension
from crewai.a2a.types import ProtocolVersion, TransportType, Url
from crewai.utilities.pydantic_schema_utils import serialize_model_class
try:
@@ -399,6 +401,11 @@ class A2AConfig(BaseModel):
default=None,
description="Optional Pydantic model for structured A2A agent responses",
)
@field_serializer("response_model", when_used="json")
def _serialize_response_model(self, value: Any) -> Any:
return serialize_model_class(value)
fail_fast: bool = Field(
default=True,
description="If True, raise error when agent unreachable; if False, skip",
@@ -488,6 +495,11 @@ class A2AClientConfig(BaseModel):
default=None,
description="Optional Pydantic model for structured A2A agent responses",
)
@field_serializer("response_model", when_used="json")
def _serialize_response_model(self, value: Any) -> Any:
return serialize_model_class(value)
fail_fast: bool = Field(
default=True,
description="If True, raise error when agent unreachable; if False, skip",

View File

@@ -380,8 +380,17 @@ class Agent(BaseAgent):
DeprecationWarning,
stacklevel=2,
)
kwargs: dict[str, int] = {}
if self.max_reasoning_attempts is not None:
kwargs["max_attempts"] = self.max_reasoning_attempts
self.planning_config = PlanningConfig(**kwargs)
if self.planning and self.planning_config is None:
# Bare planning=True should be bounded and avoid per-step
# PlannerObserver LLM calls unless explicitly configured.
self.planning_config = PlanningConfig(
max_attempts=self.max_reasoning_attempts,
reasoning_effort="low",
max_attempts=1,
)
return self
@@ -1110,9 +1119,14 @@ class Agent(BaseAgent):
"""
if self.agent_executor is None:
raise RuntimeError("Agent executor is not initialized.")
if not isinstance(self.llm, BaseLLM):
raise RuntimeError(
"LLM must be resolved before updating agent executor parameters."
)
if task is not None:
self.agent_executor.task = task
self.agent_executor.llm = self.llm
self.agent_executor.tools = tools
self.agent_executor.original_tools = raw_tools
self.agent_executor.prompt = prompt
@@ -1412,6 +1426,11 @@ class Agent(BaseAgent):
if _is_resuming_agent_executor(self.agent_executor):
executor = self.agent_executor
if not isinstance(self.llm, BaseLLM):
raise RuntimeError(
"LLM must be resolved before resuming agent executor."
)
executor.llm = self.llm
executor.tools = parsed_tools
executor.tools_names = get_tool_names(parsed_tools)
executor.tools_description = render_text_description_and_args(parsed_tools)

View File

@@ -19,15 +19,18 @@ class PlanningConfig(BaseModel):
Attributes:
reasoning_effort: Controls observation and replanning after each step.
- "low": Observe each step (validates success), but skip the
decide/replan/refine pipeline. Steps are marked complete and
execution continues linearly. Fastest option.
- "medium": Observe each step. On failure, trigger replanning.
- "low": Skip per-step PlannerObserver LLM calls (heuristic only);
skip the decide/replan/refine pipeline. Fastest option.
- "medium": Observe each step via LLM. On failure, trigger replanning.
On success, skip refinement and continue. Balanced option.
- "high": Full observation pipeline — observe every step, then
route through decide_next_action which can trigger early goal
achievement, full replanning, or lightweight refinement.
Most adaptive but adds latency per step.
observe_steps: When True, run PlannerObserver LLM calls after each step.
When False, use a lightweight heuristic (no extra LLM call).
When None (default), LLM observation runs for "medium" and "high"
only; "low" uses the heuristic path.
max_attempts: Maximum number of planning refinement attempts.
If None, will continue until the agent indicates readiness.
max_steps: Maximum number of steps in the generated plan.
@@ -76,12 +79,21 @@ class PlanningConfig(BaseModel):
default="medium",
description=(
"Controls post-step observation and replanning behavior. "
"'low' observes steps but skips replanning/refinement (fastest). "
"'medium' observes and replans only on step failure (balanced). "
"'low' skips per-step PlannerObserver LLM calls (fastest). "
"'medium' observes via LLM and replans only on step failure (balanced). "
"'high' runs full observation pipeline with replanning, refinement, "
"and early goal detection (most adaptive, highest latency)."
),
)
observe_steps: bool | None = Field(
default=None,
description=(
"Run PlannerObserver LLM calls after each step. "
"None (default): LLM observation for 'medium' and 'high' only; "
"'low' uses a heuristic (no extra LLM). "
"Set False to disable observation at any effort level."
),
)
max_attempts: int | None = Field(
default=None,
description=(

View File

@@ -39,7 +39,8 @@ logger = logging.getLogger(__name__)
class PlannerObserver:
"""Observes step execution results and decides on plan continuation.
After EVERY step execution, this class:
When ``observe_steps`` is enabled (see ``PlanningConfig``), after EVERY
step execution this class:
1. Analyzes what the step accomplished
2. Identifies new information learned
3. Decides if the remaining plan is still valid
@@ -83,6 +84,32 @@ class PlannerObserver:
return create_llm(config.llm)
return self.agent.llm
@staticmethod
def heuristic_observation(
*,
step_success: bool,
result: str = "",
) -> StepObservation:
"""Build an observation without an LLM call.
Used when ``PlanningConfig.observe_steps`` is False or when
``reasoning_effort`` is ``"low"`` (the default skips LLM observation).
Args:
step_success: Whether StepExecutor reported the step as successful.
result: The step result string (unused today; reserved for heuristics).
Returns:
A StepObservation derived from execution metadata only.
"""
_ = result
return StepObservation(
step_completed_successfully=step_success,
key_information_learned="",
remaining_plan_still_valid=True,
needs_full_replan=False,
)
def observe(
self,
completed_step: TodoItem,

View File

@@ -382,6 +382,15 @@ class Crew(FlowTrackable, BaseModel):
checkpoint_train: bool | None = Field(default=None)
checkpoint_kickoff_event_id: str | None = Field(default=None)
@field_validator(
"before_kickoff_callbacks", "after_kickoff_callbacks", mode="before"
)
@classmethod
def _drop_unresolvable_callbacks(cls, value: Any) -> Any:
if isinstance(value, list):
return [v for v in value if v is not None]
return value
@classmethod
def from_checkpoint(cls, config: CheckpointConfig) -> Crew:
"""Restore a Crew from a checkpoint, ready to resume via kickoff().
@@ -443,16 +452,20 @@ class Crew(FlowTrackable, BaseModel):
if node.event.type == "task_started" and node.event.task_id:
started_task_ids.add(node.event.task_id)
is_hierarchical = self.process == Process.hierarchical
resuming_task_agent_roles: set[str] = set()
for task in self.tasks:
if (
task.output is None
and task.agent is not None
and str(task.id) in started_task_ids
):
resuming_task_agent_roles.add(task.agent.role)
if task.output is not None or str(task.id) not in started_task_ids:
continue
executing_agent = self.manager_agent if is_hierarchical else task.agent
if executing_agent is not None:
resuming_task_agent_roles.add(executing_agent.role)
for agent in self.agents:
candidate_agents: list[BaseAgent] = list(self.agents)
if self.manager_agent is not None:
candidate_agents.append(self.manager_agent)
for agent in candidate_agents:
agent.crew = self
executor = agent.agent_executor
if (
@@ -467,7 +480,7 @@ class Crew(FlowTrackable, BaseModel):
agent.agent_executor = None
for task in self.tasks:
if task.agent is not None:
for agent in self.agents:
for agent in candidate_agents:
if agent.role == task.agent.role:
task.agent = agent
if agent.agent_executor is not None and task.output is None:
@@ -536,25 +549,9 @@ class Crew(FlowTrackable, BaseModel):
if state is None:
return
# Restore crew scope and the in-progress task scope. Inner scopes
# (agent, llm, tool) are re-created by the executor on resume.
stack: list[tuple[str, str]] = []
if self._kickoff_event_id:
stack.append((self._kickoff_event_id, "crew_kickoff_started"))
# Find the task_started event for the in-progress task (skipped on resume)
for task in self.tasks:
if task.output is None:
task_id_str = str(task.id)
for node in state.event_record.nodes.values():
if (
node.event.type == "task_started"
and node.event.task_id == task_id_str
):
stack.append((node.event.event_id, "task_started"))
break
break
restore_event_scope(tuple(stack))
# Restore last_event_id and emission counter from the record

View File

@@ -138,6 +138,36 @@ def restore_event_scope(stack: tuple[tuple[str, str], ...]) -> None:
_event_id_stack.set(stack)
def resume_task_scope(task_id: str) -> bool:
"""Push the latest recorded ``task_started`` scope for a task.
Args:
task_id: The task identifier to look up in the active event record.
Returns:
``True`` if a prior scope was pushed; ``False`` otherwise.
"""
from crewai.events.event_bus import crewai_event_bus
state = crewai_event_bus._runtime_state
if state is None:
return False
latest_event_id: str | None = None
latest_seq = -1
for node in list(state.event_record.nodes.values()):
ev = node.event
if ev.type != "task_started" or ev.task_id != task_id:
continue
seq = ev.emission_sequence or 0
if seq > latest_seq:
latest_seq = seq
latest_event_id = ev.event_id
if latest_event_id is None:
return False
push_event_scope(latest_event_id, "task_started")
return True
def push_event_scope(event_id: str, event_type: str = "") -> None:
"""Push an event ID and type onto the scope stack."""
config = _event_context_config.get() or _default_config

View File

@@ -108,6 +108,7 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agents.planner_observer import PlannerObserver
from crewai.agents.tools_handler import ToolsHandler
from crewai.llms.base_llm import BaseLLM
from crewai.tools.tool_types import ToolResult
@@ -173,8 +174,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
executor_type: Literal["experimental"] = "experimental"
suppress_flow_events: bool = True # always suppress for executor
llm: BaseLLM = Field(exclude=True)
prompt: SystemPromptResult | StandardPromptResult = Field(exclude=True)
llm: BaseLLM | None = Field(default=None, exclude=True)
prompt: SystemPromptResult | StandardPromptResult | None = Field(
default=None, exclude=True
)
max_iter: int = Field(default=25, exclude=True)
tools: list[CrewStructuredTool] = Field(default_factory=list, exclude=True)
tools_names: str = Field(default="", exclude=True)
@@ -208,7 +211,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
_has_been_invoked: bool = PrivateAttr(default=False)
_instance_id: str = PrivateAttr(default_factory=lambda: str(uuid4())[:8])
_step_executor: Any = PrivateAttr(default=None)
_planner_observer: Any = PrivateAttr(default=None)
_planner_observer: PlannerObserver | None = PrivateAttr(default=None)
@model_validator(mode="after")
def _setup_executor(self) -> Self:
@@ -358,7 +361,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
)
return self._step_executor
def _ensure_planner_observer(self) -> Any:
def _ensure_planner_observer(self) -> PlannerObserver:
"""Lazily create the PlannerObserver (avoids circular imports)."""
if self._planner_observer is None:
from crewai.agents.planner_observer import PlannerObserver
@@ -405,6 +408,63 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
return int(config.step_timeout) if config.step_timeout is not None else None
return None
def _should_observe_steps(self) -> bool:
"""Whether to run PlannerObserver LLM calls after each step.
Explicit ``observe_steps=False`` disables observation at any effort level.
``observe_steps=True`` forces it even at ``reasoning_effort="low"``.
When unset, ``low`` skips LLM observation; ``medium`` and ``high`` run it.
"""
config = self.agent.planning_config
if config is not None and config.observe_steps is not None:
return bool(config.observe_steps)
if config is not None and config.reasoning_effort == "low":
return False
return True
def _step_success_from_log(self, step_number: int) -> bool | None:
"""Read StepExecutor success flag from the execution audit log."""
for entry in reversed(self.state.execution_log):
if (
entry.get("type") == "step_execution"
and entry.get("step_number") == step_number
):
success = entry.get("success")
if success is not None:
return bool(success)
return None
def _observe_completed_step(
self,
*,
completed_step: TodoItem,
result: str,
all_completed: list[TodoItem],
remaining_todos: list[TodoItem],
step_success: bool | None = None,
) -> StepObservation:
"""Observe a completed step via LLM or a lightweight heuristic."""
from crewai.agents.planner_observer import PlannerObserver
if self._should_observe_steps():
observer = self._ensure_planner_observer()
return observer.observe(
completed_step=completed_step,
result=result,
all_completed=all_completed,
remaining_todos=remaining_todos,
)
if step_success is None:
step_success = self._step_success_from_log(completed_step.step_number)
if step_success is None:
step_success = True
return PlannerObserver.heuristic_observation(
step_success=step_success,
result=result,
)
def _build_context_for_todo(self, todo: TodoItem) -> StepExecutionContext:
"""Build an isolated execution context for a single todo.
@@ -448,13 +508,13 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
) -> Literal["step_observed_low", "step_observed_medium", "step_observed_high"]:
"""Observe step result and route based on reasoning_effort level.
Always runs PlannerObserver.observe() to validate whether the step
succeeded. Then routes to the appropriate handler based on the
agent's reasoning_effort setting:
Runs PlannerObserver LLM observation when enabled (medium/high by
default; low uses a heuristic with no extra LLM call). Then routes to
the appropriate handler based on the agent's reasoning_effort setting:
- "low": observe → mark complete → continue (no replan/refine)
- "medium": observe → replan on failure only (no refine)
- "high": observe → full decide pipeline (replan/refine/goal-achieved)
- "low": heuristic observe → mark complete → continue (no replan/refine)
- "medium": LLM observe → replan on failure only (no refine)
- "high": LLM observe → full decide pipeline (replan/refine/goal-achieved)
Based on PLAN-AND-ACT Section 3.3.
"""
@@ -465,11 +525,10 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
# No todo — route to low handler which will just continue
return "step_observed_low"
observer = self._ensure_planner_observer()
all_completed = self.state.todos.get_completed_todos()
remaining = self.state.todos.get_pending_todos()
observation = observer.observe(
observation = self._observe_completed_step(
completed_step=current_todo,
result=current_todo.result or "",
all_completed=all_completed,
@@ -489,6 +548,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
"needs_full_replan": observation.needs_full_replan,
"goal_already_achieved": observation.goal_already_achieved,
"reasoning_effort": effort,
"llm_observation": self._should_observe_steps(),
}
)
@@ -530,10 +590,8 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
observation = self.state.observations.get(current_todo.step_number)
# Even at low effort, don't ignore a hard step failure.
# A hard failure is one where the step did not succeed AND a replan
# is explicitly required (e.g. required tool not found, permission
# denied, environment misconfiguration).
# Even at low effort, don't record failed steps as completed. Only
# trigger replanning for hard failures that explicitly require it.
if (
observation
and not observation.step_completed_successfully
@@ -555,6 +613,22 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
)
return "replan_now"
if observation and not observation.step_completed_successfully:
self.state.todos.mark_failed(
current_todo.step_number, result=current_todo.result
)
if self.agent.verbose:
failed = len(self.state.todos.get_failed_todos())
total = len(self.state.todos.items)
PRINTER.print(
content=(
f"[Low] Step {current_todo.step_number} failed "
f"({failed} failed/{total} total) — continuing"
),
color="yellow",
)
return "continue_plan"
self.state.todos.mark_completed(
current_todo.step_number, result=current_todo.result
)
@@ -1107,17 +1181,17 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
# Observe each completed step sequentially (observation updates shared state)
effort = self._get_reasoning_effort()
observer = self._ensure_planner_observer()
for todo, _result in step_results:
for todo, step_result in step_results:
all_completed = self.state.todos.get_completed_todos()
remaining = self.state.todos.get_pending_todos()
observation = observer.observe(
observation = self._observe_completed_step(
completed_step=todo,
result=todo.result or "",
all_completed=all_completed,
remaining_todos=remaining,
step_success=step_result.success,
)
self.state.observations[todo.step_number] = observation
@@ -1132,6 +1206,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
"needs_full_replan": observation.needs_full_replan,
"goal_already_achieved": observation.goal_already_achieved,
"reasoning_effort": effort,
"llm_observation": self._should_observe_steps(),
}
)
@@ -2589,6 +2664,11 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self._kickoff_input = inputs.get("input", "")
if self.llm is None or self.prompt is None:
raise RuntimeError(
"AgentExecutor.llm or .prompt is unset; the executor was "
"not fully restored or initialized before execution."
)
if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint
@@ -2690,6 +2770,11 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self._kickoff_input = inputs.get("input", "")
if self.llm is None or self.prompt is None:
raise RuntimeError(
"AgentExecutor.llm or .prompt is unset; the executor was "
"not fully restored or initialized before execution."
)
if "system" in self.prompt:
from crewai.llms.cache import mark_cache_breakpoint

View File

@@ -23,6 +23,7 @@ from pydantic import (
BaseModel,
Field,
PrivateAttr,
field_serializer,
ValidationError,
field_validator,
model_validator,
@@ -95,7 +96,10 @@ from crewai.utilities.guardrail import process_guardrail, serialize_guardrail_fo
from crewai.utilities.guardrail_types import GuardrailCallable, GuardrailType
from crewai.utilities.i18n import I18N_DEFAULT
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.pydantic_schema_utils import (
generate_model_description,
serialize_model_class,
)
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.types import LLMMessage
@@ -236,6 +240,11 @@ class LiteAgent(FlowTrackable, BaseModel):
response_format: type[BaseModel] | None = Field(
default=None, description="Pydantic model for structured output"
)
@field_serializer("response_format", when_used="json")
def _serialize_response_format(self, value: Any) -> Any:
return serialize_model_class(value)
verbose: bool = Field(
default=False, description="Whether to print execution details"
)

View File

@@ -23,6 +23,7 @@ from pydantic import (
ConfigDict,
Field,
PrivateAttr,
field_serializer,
model_validator,
)
from typing_extensions import TypedDict
@@ -42,6 +43,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageStartedEvent,
)
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.pydantic_schema_utils import serialize_model_class
try:
@@ -159,6 +161,10 @@ class BaseLLM(BaseModel, ABC):
)
additional_params: dict[str, Any] = Field(default_factory=dict)
@field_serializer("response_format", when_used="json", check_fields=False)
def _serialize_response_format(self, value: Any) -> Any:
return serialize_model_class(value)
def __setattr__(self, name: str, value: Any) -> None:
if name in ("stop", "stop_sequences"):
if value is None:

View File

@@ -67,7 +67,11 @@ class EventNode(BaseModel):
event: Annotated[
BaseEvent,
BeforeValidator(_resolve_event),
PlainSerializer(lambda v: v.model_dump()),
PlainSerializer(
lambda v, info: (
v.model_dump(mode="json") if info.mode == "json" else v.model_dump()
),
),
]
edges: dict[EdgeType, list[str]] = Field(default_factory=dict)

View File

@@ -40,6 +40,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent
from crewai.context import reset_current_task_id, set_current_task_id
from crewai.core.providers.content_processor import process_content
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import resume_task_scope
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -661,7 +662,10 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
if not (agent.agent_executor and agent.agent_executor._resuming):
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
@@ -783,7 +787,10 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
if not (agent.agent_executor and agent.agent_executor._resuming):
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)

View File

@@ -8,7 +8,6 @@ from inspect import Parameter, signature
import json
import threading
from typing import (
Annotated,
Any,
Generic,
ParamSpec,
@@ -22,10 +21,10 @@ from pydantic import (
ConfigDict,
Field,
GetCoreSchemaHandler,
PlainSerializer,
PrivateAttr,
computed_field,
create_model,
field_serializer,
field_validator,
)
from pydantic_core import CoreSchema, core_schema
@@ -145,15 +144,18 @@ class BaseTool(BaseModel, ABC):
default_factory=list,
description="List of environment variables used by the tool.",
)
args_schema: Annotated[
type[PydanticBaseModel],
PlainSerializer(_serialize_schema, return_type=dict | None, when_used="json"),
] = Field(
args_schema: type[PydanticBaseModel] = Field(
default=_ArgsSchemaPlaceholder,
validate_default=True,
description="The schema for the arguments that the tool accepts.",
)
@field_serializer("args_schema", when_used="json")
def _serialize_args_schema(
self, schema: type[PydanticBaseModel] | None
) -> dict[str, Any] | None:
return _serialize_schema(schema)
description_updated: bool = Field(
default=False, description="Flag to check if the description has been updated."
)

View File

@@ -130,18 +130,15 @@ def _resolve_dotted_path(path: str) -> Callable[..., Any]:
raise ValueError(f"Cannot resolve callback {path!r}")
def callable_to_string(fn: Callable[..., Any]) -> str:
"""Serialize a callable to its dotted-path string representation.
Uses ``fn.__module__`` and ``fn.__qualname__`` to produce a string such
as ``"builtins.print"``. Lambdas and closures produce paths that contain
``<locals>`` and cannot be round-tripped via :func:`string_to_callable`.
def callable_to_string(fn: Callable[..., Any]) -> str | None:
"""Serialize a module-level callable as a ``"module.qualname"`` string.
Args:
fn: The callable to serialize.
Returns:
A dotted string of the form ``"module.qualname"``.
The dotted path, or ``None`` for lambdas and closures (not
resolvable by :func:`string_to_callable`).
"""
module = getattr(fn, "__module__", None)
qualname = getattr(fn, "__qualname__", None)
@@ -150,6 +147,8 @@ def callable_to_string(fn: Callable[..., Any]) -> str:
f"Cannot serialize {fn!r}: missing __module__ or __qualname__. "
"Use a module-level named function for checkpointable callbacks."
)
if "<locals>" in qualname or qualname == "<lambda>":
return None
return f"{module}.{qualname}"

View File

@@ -167,17 +167,12 @@ class PickleHandler:
Returns:
The data loaded from the file.
"""
with store_lock(f"file:{os.path.realpath(self.file_path)}"):
if (
not os.path.exists(self.file_path)
or os.path.getsize(self.file_path) == 0
):
return {}
if not os.path.exists(self.file_path):
return {}
with open(self.file_path, "rb") as file:
try:
with store_lock(f"file:{os.path.realpath(self.file_path)}"):
try:
with open(self.file_path, "rb") as file:
return pickle.load(file) # noqa: S301
except EOFError:
return {}
except Exception:
raise
except (FileNotFoundError, EOFError):
return {}

View File

@@ -782,6 +782,20 @@ def _inline_top_level_ref(schema: dict[str, Any]) -> dict[str, Any]:
return schema
def serialize_model_class(value: Any) -> Any:
"""Serialize a ``type[BaseModel]`` field value as its JSON schema.
Args:
value: A ``type[BaseModel]`` subclass, ``None``, or another union member.
Returns:
``value.model_json_schema()`` for model classes, ``value`` otherwise.
"""
if isinstance(value, type) and issubclass(value, BaseModel):
return value.model_json_schema()
return value
def create_model_from_schema( # type: ignore[no-any-unimported]
json_schema: dict[str, Any],
*,

View File

@@ -169,10 +169,11 @@ class AgentReasoning:
if self.agent.planning_config is not None:
return self.agent.planning_config
# Fallback for backward compatibility
return PlanningConfig(
max_attempts=getattr(self.agent, "max_reasoning_attempts", None),
)
# Fallback when planning is enabled without an explicit config
max_attempts = getattr(self.agent, "max_reasoning_attempts", None)
if max_attempts is not None:
return PlanningConfig(max_attempts=max_attempts)
return PlanningConfig()
def _resolve_llm(self) -> LLM:
"""Resolve which LLM to use for planning.

View File

@@ -1083,6 +1083,21 @@ def test_agent_use_trained_data_honors_env_var(crew_training_handler, monkeypatc
)
def test_agent_use_trained_data_skips_load_when_file_missing(tmp_path, monkeypatch):
monkeypatch.setenv(
"CREWAI_TRAINED_AGENTS_FILE", str(tmp_path / "does_not_exist.pkl")
)
agent = Agent(role="researcher", goal="test goal", backstory="test backstory")
with patch(
"crewai.utilities.file_handler.store_lock",
side_effect=AssertionError("kickoff acquired lock with no trained-agents file"),
):
result = agent._use_trained_data(task_prompt="What is 1 + 1?")
assert result == "What is 1 + 1?"
def test_agent_max_retry_limit():
agent = Agent(
role="test role",

View File

@@ -997,6 +997,18 @@ class TestNativeToolExecution:
class TestPlannerObserver:
def test_heuristic_observation_reflects_step_success(self):
from crewai.agents.planner_observer import PlannerObserver
ok = PlannerObserver.heuristic_observation(step_success=True, result="42")
assert ok.step_completed_successfully is True
assert ok.needs_full_replan is False
failed = PlannerObserver.heuristic_observation(
step_success=False, result="Error: timeout"
)
assert failed.step_completed_successfully is False
def test_observe_fallback_is_conservative_on_llm_error(self):
llm = Mock()
llm.call.side_effect = RuntimeError("llm unavailable")
@@ -1379,19 +1391,93 @@ class TestResponseFormatWithKickoff:
class TestReasoningEffort:
"""Test reasoning_effort levels in PlanningConfig.
- low: observe() runs (validates step success), but skip decide/replan/refine
- low: heuristic observation (no LLM), skip decide/replan/refine
- medium: observe() runs, replan on failure only (mocked)
- high: full observation pipeline with decide/replan/refine/goal-achieved
"""
def test_should_observe_steps_respects_config(self):
"""observe_steps and reasoning_effort gate PlannerObserver LLM calls."""
from crewai.agent.planning_config import PlanningConfig
from crewai.experimental.agent_executor import AgentExecutor
executor = Mock(spec=AgentExecutor)
executor._should_observe_steps = (
AgentExecutor._should_observe_steps.__get__(executor)
)
executor.agent = Mock()
executor.agent.planning_config = PlanningConfig(reasoning_effort="low")
assert executor._should_observe_steps() is False
executor.agent.planning_config = PlanningConfig(
reasoning_effort="low", observe_steps=True
)
assert executor._should_observe_steps() is True
executor.agent.planning_config = PlanningConfig(
reasoning_effort="high", observe_steps=False
)
assert executor._should_observe_steps() is False
executor.agent.planning_config = PlanningConfig(reasoning_effort="medium")
assert executor._should_observe_steps() is True
executor.agent.planning_config = None
assert executor._should_observe_steps() is True
def test_reasoning_effort_low_skips_planner_observer_llm(self):
"""Low effort must not call PlannerObserver.observe (no per-step LLM)."""
from crewai.agent.planning_config import PlanningConfig
from crewai.experimental.agent_executor import AgentExecutor
from crewai.utilities.planning_types import TodoItem, TodoList
executor = Mock(spec=AgentExecutor)
executor.agent = Mock()
executor.agent.planning_config = PlanningConfig(reasoning_effort="low")
executor.state = Mock()
executor.state.execution_log = [
{"type": "step_execution", "step_number": 1, "success": True},
]
executor._should_observe_steps = (
AgentExecutor._should_observe_steps.__get__(executor)
)
executor._step_success_from_log = (
AgentExecutor._step_success_from_log.__get__(executor)
)
executor._observe_completed_step = (
AgentExecutor._observe_completed_step.__get__(executor)
)
executor._ensure_planner_observer = Mock()
todo = TodoItem(
step_number=1,
description="Step one",
status="running",
result="done",
)
executor.state.todos = TodoList(items=[todo])
observation = executor._observe_completed_step(
completed_step=todo,
result="done",
all_completed=[],
remaining_todos=[],
)
executor._ensure_planner_observer.assert_not_called()
assert observation.step_completed_successfully is True
@pytest.mark.vcr()
def test_reasoning_effort_low_skips_decide_and_replan(self):
"""Low effort: observe runs but decide/replan/refine are never called.
"""Low effort: heuristic observe, no decide/replan/refine LLM pipeline.
Verifies that with reasoning_effort='low':
1. The agent produces a correct result
2. The observation phase still runs (observations are stored)
2. Observations are still stored (heuristic path)
3. The decide_next_action/refine/replan pipeline is bypassed
4. Per-step observation did not use the PlannerObserver LLM
"""
from crewai import Agent, PlanningConfig
from crewai.llm import LLM
@@ -1429,11 +1515,11 @@ class TestReasoningEffort:
assert result is not None
assert "10" in str(result)
# Verify observations were still collected (observe() ran)
# Verify observations were still collected (heuristic path, no LLM)
executor = executor_ref[0]
if executor is not None and executor.state.todos.items:
assert len(executor.state.observations) > 0, (
"Low effort should still run observe() to validate steps"
"Low effort should still record heuristic observations"
)
# Verify no replan was triggered
@@ -1448,6 +1534,7 @@ class TestReasoningEffort:
]
for log in observation_logs:
assert log.get("reasoning_effort") == "low"
assert log.get("llm_observation") is False
@pytest.mark.vcr()
def test_reasoning_effort_high_runs_full_observation_pipeline(self):
@@ -1620,6 +1707,47 @@ class TestReasoningEffort:
assert todo.status == "completed"
assert todo.result == "Done successfully"
def test_reasoning_effort_low_marks_failed_steps_failed_without_replan(self):
"""Low effort records failed heuristic observations without replanning."""
from crewai.experimental.agent_executor import AgentExecutor
from crewai.utilities.planning_types import (
StepObservation,
TodoItem,
TodoList,
)
executor = Mock(spec=AgentExecutor)
executor.agent = Mock()
executor.agent.verbose = False
executor.agent.planning_config = Mock()
executor.agent.planning_config.reasoning_effort = "low"
executor.handle_step_observed_low = (
AgentExecutor.handle_step_observed_low.__get__(executor)
)
todo = TodoItem(
step_number=1,
description="Do something",
status="running",
result="Error: tool failed",
)
todo_list = TodoList(items=[todo])
executor.state = Mock()
executor.state.todos = todo_list
executor.state.observations = {
1: StepObservation(
step_completed_successfully=False,
key_information_learned="",
remaining_plan_still_valid=True,
needs_full_replan=False,
)
}
route = executor.handle_step_observed_low()
assert route == "continue_plan"
assert todo.status == "failed"
assert todo.result == "Error: tool failed"
def test_planning_config_reasoning_effort_default_is_medium(self):
"""Verify PlanningConfig defaults reasoning_effort to 'medium'
(aligned with runtime default in _get_reasoning_effort)."""

View File

@@ -23,6 +23,8 @@ def test_planning_config_default_values():
assert config.plan_prompt is None
assert config.refine_prompt is None
assert config.llm is None
assert config.observe_steps is None
assert config.reasoning_effort == "medium"
def test_planning_config_custom_values():
@@ -88,6 +90,28 @@ def test_agent_with_planning_config_disabled():
assert agent.planning_enabled is False
def test_planning_true_without_config_sets_bounded_max_attempts():
"""planning=True alone must not leave max_attempts=None (infinite refine loop)."""
llm = LLM("gpt-4o-mini")
agent = Agent(
role="Test Agent",
goal="Test",
backstory="Test",
llm=llm,
planning=True,
verbose=False,
)
assert agent.planning_config is not None
assert agent.planning_config.max_attempts == 1
assert agent.planning_config.reasoning_effort == "low"
assert agent.planning_config.max_steps == 20
assert agent.planning_config.max_replans == 3
assert agent.planning_config.max_step_iterations == 15
assert agent.planning_config.step_timeout is None
def test_planning_enabled_property():
"""Test the planning_enabled property on Agent."""
llm = LLM("gpt-4o-mini")

View File

@@ -11,6 +11,7 @@ from crewai.events.event_context import (
MismatchBehavior,
StackDepthExceededError,
_event_context_config,
_event_id_stack,
EventContextConfig,
get_current_parent_id,
get_enclosing_parent_id,
@@ -21,6 +22,7 @@ from crewai.events.event_context import (
pop_event_scope,
push_event_scope,
reset_last_event_id,
resume_task_scope,
set_last_event_id,
set_triggering_event_id,
triggered_by_scope,
@@ -180,6 +182,91 @@ class TestTriggeredByScope:
assert get_triggering_event_id() is None
class TestResumeTaskScope:
"""Tests for the checkpoint-resume scope helper."""
@pytest.fixture(autouse=True)
def _reset_stack(self) -> None:
_event_id_stack.set(())
def _bind_runtime_state(self, *event_dicts: dict[str, object]):
from crewai.events import crewai_event_bus
from crewai.events.types.task_events import TaskStartedEvent
from crewai.state.event_record import EventRecord
from crewai.state.runtime import RuntimeState
record = EventRecord()
for spec in event_dicts:
ev = TaskStartedEvent(context=None, task=None)
ev.task_id = spec["task_id"] # type: ignore[assignment]
ev.event_id = spec["event_id"] # type: ignore[assignment]
ev.emission_sequence = spec["emission_sequence"] # type: ignore[assignment]
record.add(ev)
state = RuntimeState(root=[])
state._event_record = record
previous = crewai_event_bus._runtime_state
crewai_event_bus._runtime_state = state
return crewai_event_bus, previous
def test_returns_false_when_no_runtime_state(self) -> None:
from crewai.events import crewai_event_bus
previous = crewai_event_bus._runtime_state
crewai_event_bus._runtime_state = None
try:
assert resume_task_scope("any-task") is False
assert _event_id_stack.get() == ()
finally:
crewai_event_bus._runtime_state = previous
def test_returns_false_when_no_matching_event(self) -> None:
bus, previous = self._bind_runtime_state(
{"task_id": "other", "event_id": "e1", "emission_sequence": 1},
)
try:
assert resume_task_scope("missing") is False
assert _event_id_stack.get() == ()
finally:
bus._runtime_state = previous
def test_pushes_latest_event_for_task(self) -> None:
bus, previous = self._bind_runtime_state(
{"task_id": "t1", "event_id": "e1", "emission_sequence": 1},
{"task_id": "t1", "event_id": "e2", "emission_sequence": 5},
{"task_id": "t1", "event_id": "e3", "emission_sequence": 3},
{"task_id": "t2", "event_id": "x1", "emission_sequence": 9},
)
try:
assert resume_task_scope("t1") is True
assert _event_id_stack.get() == (("e2", "task_started"),)
finally:
bus._runtime_state = previous
def test_pairs_cleanly_with_task_completed(self) -> None:
"""The pushed scope must be popped by a matching task_completed."""
from crewai.events import crewai_event_bus
from crewai.events.types.task_events import TaskCompletedEvent
from crewai.tasks.task_output import TaskOutput
push_event_scope("kickoff-1", "crew_kickoff_started")
bus, previous = self._bind_runtime_state(
{"task_id": "t1", "event_id": "started-1", "emission_sequence": 1},
)
try:
assert resume_task_scope("t1") is True
output = TaskOutput(description="d", raw="r", agent="a")
completed = TaskCompletedEvent(output=output, task=None)
completed.task_id = "t1"
crewai_event_bus.emit(None, completed)
crewai_event_bus.flush()
assert _event_id_stack.get() == (("kickoff-1", "crew_kickoff_started"),)
assert completed.started_event_id == "started-1"
finally:
bus._runtime_state = previous
_event_id_stack.set(())
def test_agent_scope_preserved_after_tool_error_event() -> None:
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import functools
import os
from collections.abc import Callable
from typing import Any
import pytest
from pydantic import BaseModel, ValidationError
@@ -93,10 +94,18 @@ class TestCallableToString:
result = callable_to_string(print)
assert result == "builtins.print"
def test_lambda_produces_locals_path(self) -> None:
def test_lambda_returns_none(self) -> None:
fn = lambda: None # noqa: E731
result = callable_to_string(fn)
assert "<lambda>" in result
assert callable_to_string(fn) is None
def test_closure_returns_none(self) -> None:
def outer() -> Callable[[], None]:
def inner() -> None:
return None
return inner
assert callable_to_string(outer()) is None
def test_missing_qualname_raises(self) -> None:
obj = type("NoQual", (), {"__module__": "test"})()

View File

@@ -1,6 +1,7 @@
import os
import tempfile
import unittest
from unittest.mock import patch
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -53,3 +54,23 @@ class InternalCrewTrainingHandler(unittest.TestCase):
# Assert that the new agent and data are appended correctly
data = self.handler.load()
assert data[agent_id][train_iteration] == new_data
def test_load_missing_file_does_not_acquire_lock(self):
handler = CrewTrainingHandler(self.temp_file.name + ".missing")
with patch(
"crewai.utilities.file_handler.store_lock",
side_effect=AssertionError("load() acquired lock for missing file"),
):
assert handler.load() == {}
def test_load_acquires_lock_for_zero_size_file(self):
# Empty file mimics a concurrent save() mid-truncation (open "wb").
assert os.path.getsize(self.temp_file.name) == 0
with patch(
"crewai.utilities.file_handler.store_lock",
side_effect=AssertionError("load() short-circuited on size 0"),
):
with self.assertRaises(AssertionError):
self.handler.load()

View File

@@ -189,6 +189,7 @@ exclude-newer = "3 days"
# authlib <1.6.11 has GHSA-jj8c-mmj3-mmgv (CSRF bypass in cache-based state storage).
# pip <26.1.1 has GHSA-58qw-9mgm-455v (archive handling); OSV considers 26.1.1 unaffected.
# paramiko <5.0.0 has GHSA-r374-rxx8-8654 (SHA-1 in rsakey.py); OSV considers 5.0.0 unaffected. Transitive via composio-core.
# starlette <1.0.1 has PYSEC-2026-161 (missing Host header validation poisons request.url.path, bypassing path-based auth). Transitive via fastapi.
# litellm 1.83.8+ hard-pins openai==2.24.0, missing openai.types.responses used by crewai;
# override to >=2.30.0 (the version litellm 1.83.7 used) until upstream relaxes the pin.
override-dependencies = [
@@ -209,6 +210,7 @@ override-dependencies = [
"authlib>=1.6.11",
"pip>=26.1.1",
"paramiko>=5.0.0",
"starlette>=1.0.1",
]
[tool.uv.workspace]

12
uv.lock generated
View File

@@ -13,9 +13,12 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-05-17T14:20:01.778505Z"
exclude-newer = "2026-05-19T15:27:50.647689Z"
exclude-newer-span = "P3D"
[options.exclude-newer-package]
starlette = "2026-05-22T16:00:00Z"
[manifest]
members = [
"crewai",
@@ -40,6 +43,7 @@ overrides = [
{ name = "pypdf", specifier = ">=6.10.2,<7" },
{ name = "python-multipart", specifier = ">=0.0.27,<1" },
{ name = "rich", specifier = ">=13.7.1" },
{ name = "starlette", specifier = ">=1.0.1" },
{ name = "transformers", marker = "python_full_version >= '3.10'", specifier = ">=5.4.0" },
{ name = "urllib3", specifier = ">=2.7.0" },
{ name = "uv", specifier = ">=0.11.6,<1" },
@@ -8528,15 +8532,15 @@ wheels = [
[[package]]
name = "starlette"
version = "1.0.0"
version = "1.0.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/81/69/17425771797c36cded50b7fe44e850315d039f28b15901ab44839e70b593/starlette-1.0.0.tar.gz", hash = "sha256:6a4beaf1f81bb472fd19ea9b918b50dc3a77a6f2e190a12954b25e6ed5eea149", size = 2655289, upload-time = "2026-03-22T18:29:46.779Z" }
sdist = { url = "https://files.pythonhosted.org/packages/08/a3/84e821cc54b4ab50ae6dbc6ac3800a651b65ec35f045cc73785380654057/starlette-1.0.1.tar.gz", hash = "sha256:512399c5f1de7fac99c88572212ded9ddeddef2fb32afa82d724000e88b38f4f", size = 2659596, upload-time = "2026-05-21T21:58:58.433Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/c9/584bc9651441b4ba60cc4d557d8a547b5aff901af35bda3a4ee30c819b82/starlette-1.0.0-py3-none-any.whl", hash = "sha256:d3ec55e0bb321692d275455ddfd3df75fff145d009685eb40dc91fc66b03d38b", size = 72651, upload-time = "2026-03-22T18:29:45.111Z" },
{ url = "https://files.pythonhosted.org/packages/ec/e1/b2df4bc09a1e51ff664c1e17018a4274b42e5e9352e4a478ea540512dc88/starlette-1.0.1-py3-none-any.whl", hash = "sha256:7c0e69b2ee1c848bd54669d908500117a3ee13de603a21427e5c6fc1adf98dcd", size = 72802, upload-time = "2026-05-21T21:58:56.551Z" },
]
[[package]]