Compare commits

..

1 Commits

Author SHA1 Message Date
Vinicius Brasil
b9586874b3 Add lead scoring FlowDefinition example 2026-06-16 10:54:34 -07:00
16 changed files with 238 additions and 1460 deletions

View File

@@ -0,0 +1,23 @@
import logging
from typing import Literal
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
logger = logging.getLogger("lead_flow")
class LogLeadInput(BaseModel):
message: str = Field(description="The message to log.")
level: Literal["debug", "info", "warning", "error"] = "info"
class LogLeadTool(BaseTool):
name: str = "log_lead"
description: str = "Log a message about a lead that was not pursued."
args_schema: type[BaseModel] = LogLeadInput
def _run(self, message: str, level: str = "info") -> str:
logger.log(logging.getLevelName(level.upper()), message)
return message

View File

@@ -0,0 +1,98 @@
# uv run --project lib/crewai crewai run --definition lib/crewai/examples/flows/lead_scoring_flow.yaml --inputs '{"lead":{"name":"Dana Lee","company":"Acme","employees":1200}}'
# uv run --project lib/crewai crewai run --definition lib/crewai/examples/flows/lead_scoring_flow.yaml --inputs '{"lead":{"name":"Sam Poe","company":"Tiny LLC","employees":3}}'
schema: crewai.flow/v1
name: LeadScoringFlow
description: Score an inbound lead, then route high-scoring leads to outreach and the rest to a log tool.
state:
type: dict
default:
lead: {}
methods:
score_lead:
start: true
do:
call: crew
with:
name: lead_scoring_crew
verbose: true
agents:
scorer:
role: Lead Qualification Analyst
goal: Assign a 0-100 fit score to inbound lead {name} from {company}
backstory: >
A revenue-ops veteran who scores leads against a clear ideal
customer profile: company size is the dominant signal.
tasks:
- name: score_lead_task
agent: scorer
description: >
Evaluate the inbound lead {name} from {company} ({employees}
employees) against this rubric, where company size dominates:
1000+ employees scores 85-100 (hot), 200-999 scores 70-84 (warm),
and under 200 scores 0-69 (cold). Return an integer score with a
one-line rationale.
expected_output: >
A LeadScore with an integer `score` (0-100), a short `reasoning`,
and a `tier` of "hot", "warm", or "cold".
output_pydantic:
type: object
properties:
score:
type: integer
reasoning:
type: string
tier:
type: string
enum: [hot, warm, cold]
required: [score, reasoning, tier]
inputs:
name: "${state.lead.name}"
company: "${state.lead.company}"
employees: "${state.lead.employees}"
route_by_score:
listen: score_lead
router: true
emit: [qualified, unqualified]
do:
call: expression
expr: "outputs.score_lead.pydantic.score >= 80 ? 'qualified' : 'unqualified'"
run_outreach:
listen: qualified
do:
call: crew
with:
name: outreach_crew
verbose: true
agents:
sdr:
role: Outbound SDR
goal: Draft a tailored first-touch email to {name} at {company}
backstory: >
A top-performing SDR who writes concise, personalized outreach
that earns replies from busy buyers.
tasks:
- name: draft_outreach_task
agent: sdr
description: >
Write a short, personalized first-touch email to {name} at
{company}. Ground the hook in this qualification rationale:
"{reasoning}".
expected_output: A ready-to-send outreach email with a subject line and body.
inputs:
name: "${state.lead.name}"
company: "${state.lead.company}"
reasoning: "${outputs.score_lead.pydantic.reasoning}"
log_unqualified:
listen: unqualified
do:
call: tool
ref: lead_flow.tools:LogLeadTool
with:
message: "${'Skipped low-fit lead ' + state.lead.name + ' (score ' + string(outputs.score_lead.pydantic.score) + ')'}"
level: info

View File

@@ -28,7 +28,6 @@ from crewai.flow.conversational_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
)
from crewai.project.crew_definition import CrewDefinition
logger = logging.getLogger(__name__)
@@ -42,7 +41,6 @@ __all__ = [
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
@@ -178,15 +176,6 @@ class FlowToolActionDefinition(BaseModel):
with_: dict[str, Any] | None = Field(default=None, alias="with")
class FlowCrewActionDefinition(BaseModel):
"""A Flow method action that builds and kicks off a CrewAI crew."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: TypingLiteral["crew"]
with_: CrewDefinition = Field(alias="with")
class FlowExpressionActionDefinition(BaseModel):
"""A Flow method action that evaluates a CEL expression."""
@@ -197,10 +186,7 @@ class FlowExpressionActionDefinition(BaseModel):
FlowInnerActionDefinition = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
FlowCodeActionDefinition | FlowToolActionDefinition | FlowExpressionActionDefinition
)
@@ -250,7 +236,6 @@ class FlowEachActionDefinition(BaseModel):
FlowActionDefinition = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowCrewActionDefinition
| FlowExpressionActionDefinition
| FlowEachActionDefinition
)

View File

@@ -11,7 +11,6 @@ from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowCodeActionDefinition,
FlowCrewActionDefinition,
FlowEachActionDefinition,
FlowEachInnerActionDefinition,
FlowExpressionActionDefinition,
@@ -105,25 +104,6 @@ class ToolAction:
) from e
class CrewAction:
definition_type = FlowCrewActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowCrewActionDefinition) -> None:
self.flow = flow
self.definition = definition
async def run(self, *_args: Any, **kwargs: Any) -> Any:
from crewai.project.crew_loader import load_crew_from_definition
local_context = _pop_local_context(kwargs)
crew_definition = self.definition.with_
inputs = render_with_block(
self.flow, crew_definition.inputs, local_context=local_context
)
crew, _ = load_crew_from_definition(crew_definition, source="crew action")
return await crew.kickoff_async(inputs=inputs)
class ExpressionAction:
definition_type = FlowExpressionActionDefinition
@@ -197,7 +177,6 @@ _ACTION_TYPES: tuple[_ActionType, ...] = (
EachAction,
CodeAction,
ToolAction,
CrewAction,
ExpressionAction,
)

View File

@@ -14,22 +14,12 @@ from crewai.project.annotations import (
tool,
)
from crewai.project.crew_base import CrewBase
from crewai.project.crew_definition import (
CrewAgentDefinition,
CrewDefinition,
CrewTaskDefinition,
PythonReferenceDefinition,
)
from crewai.project.crew_loader import load_crew, load_crew_and_kickoff
from crewai.project.json_loader import load_agent, strip_jsonc_comments
__all__ = [
"CrewAgentDefinition",
"CrewBase",
"CrewDefinition",
"CrewTaskDefinition",
"PythonReferenceDefinition",
"after_kickoff",
"agent",
"before_kickoff",

View File

@@ -1,129 +0,0 @@
"""Definition models for inline CrewAI crew configurations."""
from __future__ import annotations
from typing import Any, TypeAlias
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
__all__ = [
"CrewAgentDefinition",
"CrewDefinition",
"CrewTaskDefinition",
"PythonReferenceDefinition",
]
class PythonReferenceDefinition(BaseModel):
"""Dotted Python reference used by crew definitions."""
python: str
@field_validator("python")
@classmethod
def _validate_python_ref(cls, value: str) -> str:
path = value.strip()
if not path:
raise ValueError("Python reference 'python' must be a string")
if "." not in path:
raise ValueError(
f"Python reference '{path}' must be a dotted import path "
"like 'module.attribute'"
)
return path
class CrewAgentDefinition(BaseModel):
"""Inline agent definition used by a crew definition."""
model_config = ConfigDict(extra="allow")
role: str
goal: str
backstory: str
type: str | PythonReferenceDefinition | None = None
settings: dict[str, Any] = Field(default_factory=dict)
@field_validator("settings", mode="before")
@classmethod
def _validate_settings(cls, value: Any) -> Any:
if value is not None and not isinstance(value, dict):
raise ValueError("agent.settings must be a mapping")
return value or {}
class CrewTaskDefinition(BaseModel):
"""Task definition used by a crew definition."""
model_config = ConfigDict(extra="allow")
description: str
expected_output: str
name: str | None = None
agent: str | None = None
context: list[str] | None = None
type: str | PythonReferenceDefinition | None = None
_CrewAgentsInput: TypeAlias = dict[str, CrewAgentDefinition] | list[dict[str, Any]]
class CrewDefinition(BaseModel):
"""In-memory JSON/YAML crew definition with inline agents and tasks."""
model_config = ConfigDict(extra="allow")
agents: dict[str, CrewAgentDefinition]
tasks: list[CrewTaskDefinition]
inputs: dict[str, Any] = Field(default_factory=dict)
manager_agent: str | PythonReferenceDefinition | None = None
@field_validator("inputs", mode="before")
@classmethod
def _validate_inputs(cls, value: Any) -> Any:
if value is not None and not isinstance(value, dict):
raise ValueError("crew.inputs must be a mapping")
return value or {}
@field_validator(
"agents",
mode="before",
json_schema_input_type=_CrewAgentsInput,
)
@classmethod
def _validate_inline_agents(cls, value: Any) -> Any:
if isinstance(value, dict):
return value
if not isinstance(value, list):
return value
agents: dict[str, Any] = {}
for index, item in enumerate(value):
if not isinstance(item, dict):
raise ValueError(f"agents[{index}] must be an inline agent mapping")
if "name" in item:
name = item["name"]
if not isinstance(name, str) or not name:
raise ValueError(f"agents[{index}].name must be a non-empty string")
agents[name] = {key: val for key, val in item.items() if key != "name"}
continue
if len(item) != 1:
raise ValueError(
f"agents[{index}] must include a name field or be a one-key mapping"
)
name, definition = next(iter(item.items()))
agents[str(name)] = definition
return agents
@model_validator(mode="after")
def _validate_inline_shape(self) -> CrewDefinition:
if not self.agents:
raise ValueError("crew action requires inline agent definitions")
if not self.tasks:
raise ValueError("crew action requires a non-empty tasks list")
return self

View File

@@ -7,15 +7,10 @@ from typing import Any
from pydantic import ValidationError
from crewai.project.crew_definition import CrewDefinition
from crewai.project.json_loader import (
JSONAgentDefinition,
JSONCrewProject,
JSONProjectError,
JSONProjectValidationError,
_AgentDefinitionSource,
_crew_kwargs_from_definition,
_load_json_crew_project_definition,
_task_class_from_definition,
_task_kwargs_from_definition,
load_json_crew_project,
@@ -32,73 +27,12 @@ def load_crew(
default inputs. Agent definitions are resolved from individual
``<name>.jsonc`` / ``<name>.json`` files inside an ``agents/`` directory.
"""
crew_path = Path(source)
project = load_json_crew_project(crew_path, agents_dir=agents_dir)
return _load_crew_project(project, project_root=crew_path.parent)
def load_crew_from_definition(
definition: CrewDefinition | dict[str, Any],
*,
source: str | Path = "<inline crew>",
project_root: str | Path | None = None,
) -> tuple[Any, dict[str, Any]]:
"""Load a ``Crew`` from an in-memory JSON/YAML crew definition."""
root = Path(project_root) if project_root is not None else Path.cwd()
source_label = str(source)
crew_definition = (
definition
if isinstance(definition, CrewDefinition)
else CrewDefinition.model_validate(definition)
)
definition_data = crew_definition.model_dump(mode="python", exclude_none=True)
project = _crew_project_from_definition(
definition_data,
source=source_label,
project_root=root,
)
return _load_crew_project(project, project_root=root)
def _crew_project_from_definition(
definition: dict[str, Any],
*,
source: str,
project_root: Path,
) -> JSONCrewProject:
agent_bodies: dict[str, Any] = definition["agents"]
agent_names = list(agent_bodies)
manager_agent = definition.get("manager_agent")
if isinstance(manager_agent, str):
agent_names = [name for name in agent_names if name != manager_agent]
def load_agent_definition_source(agent_name: str) -> _AgentDefinitionSource | None:
body = agent_bodies.get(agent_name)
if body is None:
return None
return body, f"{source}: agents.{agent_name}"
return _load_json_crew_project_definition(
{**definition, "agents": agent_names},
source=source,
agents_dir=project_root / "agents",
project_root=project_root,
load_agent_definition_source=load_agent_definition_source,
missing_agent_hint=None,
collect_errors=False,
)
def _load_crew_project(
project: JSONCrewProject,
*,
project_root: Path,
) -> tuple[Any, dict[str, Any]]:
from crewai import Crew, Task
source_label = str(project.crew_path)
crew_path = Path(source)
project = load_json_crew_project(crew_path, agents_dir=agents_dir)
def build_agent(agent_def: JSONAgentDefinition) -> Any:
def build_agent(agent_def: Any) -> Any:
try:
return agent_def.agent_class(**agent_def.kwargs)
except ValidationError as exc:
@@ -118,26 +52,22 @@ def _load_crew_project(
task_name_map: dict[str, Task] = {}
for index, task_defn in enumerate(project.task_definitions):
task_source = f"{source_label}: tasks[{index}]"
task_class = _task_class_from_definition(
task_defn,
f"{task_source}: type",
project_root=project_root,
)
source_label = f"{crew_path}: tasks[{index}]"
task_class = _task_class_from_definition(task_defn, f"{source_label}: type")
task_kwargs = _task_kwargs_from_definition(
task_defn,
agents_map=agents_map,
task_name_map=task_name_map,
source=task_source,
project_root=project_root,
source=source_label,
project_root=crew_path.parent,
)
try:
task = task_class(**task_kwargs)
except ValidationError as exc:
raise JSONProjectError(f"{task_source}: validation failed: {exc}") from exc
raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc
except Exception as exc:
raise JSONProjectError(
f"{task_source}: failed to load task: {exc}"
f"{source_label}: failed to load task: {exc}"
) from exc
tasks_list.append(task)
@@ -150,18 +80,17 @@ def _load_crew_project(
agents=[agents_map[name] for name in project.agent_names],
tasks=tasks_list,
agents_map=agents_map,
source=source_label,
project_root=project_root,
source=crew_path,
)
try:
crew = Crew(**crew_kwargs)
except ValidationError as exc:
raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc
raise JSONProjectError(f"{crew_path}: validation failed: {exc}") from exc
except JSONProjectValidationError:
raise
except Exception as exc:
raise JSONProjectError(f"{source_label}: failed to load crew: {exc}") from exc
raise JSONProjectError(f"{crew_path}: failed to load crew: {exc}") from exc
return crew, project.definition.get("inputs", {})

File diff suppressed because it is too large Load Diff

View File

@@ -99,7 +99,7 @@ def to_serializable(
if isinstance(obj, BaseModel):
try:
return to_serializable(
obj=obj.model_dump(mode="json", exclude=exclude, serialize_as_any=True),
obj=obj.model_dump(mode="json", exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,

View File

@@ -4,14 +4,12 @@ from __future__ import annotations
import json
from pathlib import Path
import sys
import types
import pytest
from crewai.llms.base_llm import BaseLLM
from crewai.project.json_loader import JSONProjectError, JSONProjectValidationError
from crewai.project.crew_loader import load_crew, load_crew_from_definition
from crewai.project.crew_loader import load_crew
def _write_python_defs(tmp_path: Path) -> None:
@@ -72,91 +70,6 @@ def _input_file_path(value) -> Path:
class TestLoadCrew:
def test_load_crew_from_inline_definition(self):
crew, inputs = load_crew_from_definition(
{
"name": "inline_crew",
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research {topic}",
"backstory": "Knows things.",
}
},
"tasks": [
{
"name": "research",
"description": "Research {topic}",
"expected_output": "Findings about {topic}",
"agent": "researcher",
}
],
"inputs": {"topic": "AI"},
}
)
assert crew.name == "inline_crew"
assert crew.agents[0].role == "Researcher"
assert crew.tasks[0].description == "Research {topic}"
assert inputs == {"topic": "AI"}
def test_inline_definition_accepts_null_inputs(self):
_, inputs = load_crew_from_definition(
{
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research",
"backstory": "Knows things.",
}
},
"tasks": [
{
"description": "Research",
"expected_output": "Findings",
"agent": "researcher",
}
],
"inputs": None,
}
)
assert inputs == {}
def test_inline_hierarchical_manager_agent_is_not_duplicated(self):
crew, _ = load_crew_from_definition(
{
"name": "inline_hier_manager_crew",
"agents": {
"worker": {
"role": "Worker",
"goal": "Do work",
"backstory": "Does things.",
},
"manager": {
"role": "Manager",
"goal": "Coordinate work",
"backstory": "Keeps the work moving.",
},
},
"tasks": [
{
"description": "Do work",
"expected_output": "Work done",
"agent": "manager",
}
],
"process": "hierarchical",
"manager_agent": "manager",
}
)
assert len(crew.agents) == 1
assert crew.agents[0].role == "Worker"
assert crew.manager_agent is not None
assert crew.manager_agent.role == "Manager"
assert crew.tasks[0].agent is crew.manager_agent
def test_minimal_crew(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
@@ -562,98 +475,6 @@ class TestLoadCrew:
assert "summary" in task.output_json.model_fields
assert task.converter_cls.__name__ == "SpecialConverter"
def test_crew_rejects_stdlib_python_ref_for_agent_callback(
self, tmp_path: Path
):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(
agents_dir,
"worker",
step_callback={"python": "os.system"},
)
crew_def = {
"name": "unsafe_callback_crew",
"agents": ["worker"],
"tasks": [
{
"name": "work",
"description": "Do work",
"expected_output": "Work done",
"agent": "worker",
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
with pytest.raises(JSONProjectError, match="project root"):
load_crew(crew_file)
def test_crew_rejects_stdlib_python_ref_for_mcp_tool_filter(
self, tmp_path: Path
):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(
agents_dir,
"worker",
mcps=[
{
"command": "python",
"args": ["server.py"],
"tool_filter": {"python": "os.system"},
}
],
)
crew_def = {
"name": "unsafe_mcp_filter_crew",
"agents": ["worker"],
"tasks": [
{
"name": "work",
"description": "Do work",
"expected_output": "Work done",
"agent": "worker",
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
with pytest.raises(JSONProjectError, match="project root"):
load_crew(crew_file)
def test_crew_rejects_callable_python_ref_for_object_field(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
):
_write_python_defs(tmp_path)
monkeypatch.syspath_prepend(str(tmp_path))
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(
agents_dir,
"worker",
security_config={"python": "json_refs.always_true"},
)
crew_def = {
"name": "unsafe_object_ref_crew",
"agents": ["worker"],
"tasks": [
{
"name": "work",
"description": "Do work",
"expected_output": "Work done",
"agent": "worker",
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
with pytest.raises(JSONProjectError, match="supported object reference"):
load_crew(crew_file)
def test_crew_loads_project_relative_input_files(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
@@ -689,147 +510,6 @@ class TestLoadCrew:
assert _input_file_path(input_files["brief"]) == brief_path
assert _input_file_path(input_files["spec"]) == spec_path
def test_crew_rejects_relative_input_file_outside_project(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "reader")
crew_def = {
"name": "unsafe_input_files_crew",
"agents": ["reader"],
"tasks": [
{
"name": "read",
"description": "Read files",
"expected_output": "File summary",
"agent": "reader",
"input_files": {"secret": "../secret.txt"},
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
with pytest.raises(JSONProjectValidationError, match="outside the project root"):
load_crew(crew_file)
def test_crew_rejects_absolute_input_file_outside_project(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "reader")
outside_path = tmp_path.parent / "secret.txt"
crew_def = {
"name": "unsafe_absolute_input_files_crew",
"agents": ["reader"],
"tasks": [
{
"name": "read",
"description": "Read files",
"expected_output": "File summary",
"agent": "reader",
"input_files": {"secret": str(outside_path)},
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
with pytest.raises(JSONProjectValidationError, match="outside the project root"):
load_crew(crew_file)
def test_crew_rejects_file_uri_input_file_outside_project(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "reader")
outside_uri = (tmp_path.parent / "secret.txt").as_uri()
crew_def = {
"name": "unsafe_file_uri_input_files_crew",
"agents": ["reader"],
"tasks": [
{
"name": "read",
"description": "Read files",
"expected_output": "File summary",
"agent": "reader",
"input_files": {"secret": outside_uri},
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
with pytest.raises(JSONProjectValidationError, match="outside the project root"):
load_crew(crew_file)
@pytest.mark.parametrize(
"outside_path",
[
r"C:\Users\alice\.ssh\id_rsa",
"C:/Users/alice/.ssh/id_rsa",
r"\\server\share\secret.txt",
"//server/share/secret.txt",
],
)
def test_crew_rejects_windows_input_file_outside_project(
self, tmp_path: Path, outside_path: str
):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "reader")
crew_def = {
"name": "unsafe_windows_input_files_crew",
"agents": ["reader"],
"tasks": [
{
"name": "read",
"description": "Read files",
"expected_output": "File summary",
"agent": "reader",
"input_files": {"secret": outside_path},
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
with pytest.raises(JSONProjectValidationError, match="outside the project root"):
load_crew(crew_file)
def test_crew_restores_external_module_cache_after_project_ref(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
):
_write_python_defs(tmp_path)
external_module = types.ModuleType("json_refs")
external_module.__file__ = str(tmp_path.parent / "json_refs.py")
external_module.marker = "external"
monkeypatch.setitem(sys.modules, "json_refs", external_module)
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(
agents_dir,
"worker",
step_callback={"python": "json_refs.task_callback"},
)
crew_def = {
"name": "cache_restore_crew",
"agents": ["worker"],
"tasks": [
{
"name": "work",
"description": "Do work",
"expected_output": "Work done",
"agent": "worker",
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
crew, _ = load_crew(crew_file)
assert crew.agents[0].step_callback.__name__ == "task_callback"
assert sys.modules["json_refs"] is external_module
def test_missing_agent_file_raises(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()

View File

@@ -11,7 +11,6 @@ import pytest
from crewai.llms.base_llm import BaseLLM
from crewai.project.json_loader import (
JSONProjectValidationError,
_looks_like_windows_absolute_path,
find_json_project_file,
load_agent,
strip_jsonc_comments,
@@ -75,31 +74,6 @@ def test_find_json_project_file_prefers_jsonc(tmp_path: Path):
assert find_json_project_file(tmp_path, "agent") == jsonc_path
@pytest.mark.parametrize(
"path_value",
[
r"C:\Users\alice\.ssh\id_rsa",
"C:/Users/alice/.ssh/id_rsa",
r"\\server\share\secret.txt",
"//server/share/secret.txt",
],
)
def test_windows_absolute_path_detection(path_value: str):
assert _looks_like_windows_absolute_path(path_value)
@pytest.mark.parametrize(
"path_value",
[
r"folder\file.txt",
"folder/file.txt",
r"\server\share\secret.txt",
],
)
def test_windows_absolute_path_detection_ignores_relative_paths(path_value: str):
assert not _looks_like_windows_absolute_path(path_value)
class TestLoadAgent:
def test_load_minimal_agent(self, tmp_path: Path):
agent_def = {
@@ -506,28 +480,6 @@ class TestValidationDoesNotExecuteTools:
assert "Invalid custom tool name" in str(exc_info.value)
def test_validate_rejects_deep_python_ref_nesting(self, tmp_path):
from crewai.project.json_loader import validate_crew_project
crew_path = self._write_project(
tmp_path,
tool_line='{"tool_type": "some.module.Tool"}',
)
agent_file = tmp_path / "agents" / "worker.jsonc"
agent_def = json.loads(agent_file.read_text())
nested: dict[str, object] = {}
current = nested
for _ in range(70):
child: dict[str, object] = {}
current["nested"] = child
current = child
current["ref"] = {"python": "callbacks.step_callback"}
agent_def["security_config"] = nested
agent_file.write_text(json.dumps(agent_def))
with pytest.raises(JSONProjectValidationError, match="maximum depth"):
validate_crew_project(crew_path, tmp_path / "agents")
class TestCustomToolPathSafety:
@pytest.mark.parametrize(

View File

@@ -3830,6 +3830,7 @@ def test_crew_testing_function(researcher):
assert isinstance(received_events[1], CrewTestCompletedEvent)
@pytest.mark.vcr()
def test_hierarchical_verbose_manager_agent(researcher, writer):
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
@@ -3844,18 +3845,13 @@ def test_hierarchical_verbose_manager_agent(researcher, writer):
verbose=True,
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
task.output = mock_task_output
with patch.object(Task, "execute_sync", return_value=mock_task_output):
crew.kickoff()
crew.kickoff()
assert crew.manager_agent is not None
assert crew.manager_agent.verbose
@pytest.mark.vcr()
def test_hierarchical_verbose_false_manager_agent(researcher, writer):
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
@@ -3870,13 +3866,7 @@ def test_hierarchical_verbose_false_manager_agent(researcher, writer):
verbose=False,
)
mock_task_output = TaskOutput(
description="Mock description", raw="mocked output", agent="mocked agent", messages=[]
)
task.output = mock_task_output
with patch.object(Task, "execute_sync", return_value=mock_task_output):
crew.kickoff()
crew.kickoff()
assert crew.manager_agent is not None
assert not crew.manager_agent.verbose

View File

@@ -41,7 +41,6 @@ def test_flow_public_exports_are_explicit():
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",

View File

@@ -765,252 +765,6 @@ methods:
)
def test_crew_action_runs_inline_yaml_definition(monkeypatch: pytest.MonkeyPatch):
from crewai import Crew
async def fake_kickoff_async(
self: Crew, inputs: dict[str, Any] | None = None, **_kwargs: Any
) -> dict[str, Any]:
return {
"crew": self.name,
"agents": [agent.role for agent in self.agents],
"tasks": [task.description for task in self.tasks],
"inputs": inputs,
}
monkeypatch.setattr(Crew, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: CrewFlow
methods:
research:
do:
call: crew
with:
name: inline_research
agents:
researcher:
role: Researcher
goal: Research {topic}
backstory: Knows things.
tasks:
- name: research_task
description: Research {topic}
expected_output: Findings about {topic}
agent: researcher
inputs:
topic: "${state.topic}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "inline_research",
"agents": ["Researcher"],
"tasks": ["Research {topic}"],
"inputs": {"topic": "AI"},
}
def test_crew_action_round_trips_with_inline_definition():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
"methods": {
"research": {
"start": True,
"do": {
"call": "crew",
"with": {
"name": "inline_research",
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research {topic}",
"backstory": "Knows things.",
}
},
"tasks": [
{
"name": "research_task",
"description": "Research {topic}",
"expected_output": "Findings about {topic}",
"agent": "researcher",
}
],
"inputs": {"topic": "${state.topic}"},
},
},
}
},
}
)
assert definition.to_dict()["methods"]["research"]["do"]["call"] == "crew"
assert (
definition.to_dict()["methods"]["research"]["do"]["with"]["agents"][
"researcher"
]["role"]
== "Researcher"
)
def test_crew_action_normalizes_named_agent_list_definition():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
"methods": {
"research": {
"start": True,
"do": {
"call": "crew",
"with": {
"agents": [
{
"name": "researcher",
"role": "Researcher",
"goal": "Research {topic}",
"backstory": "Knows things.",
}
],
"tasks": [
{
"description": "Research {topic}",
"expected_output": "Findings about {topic}",
"agent": "researcher",
}
],
},
},
}
},
}
)
assert (
definition.to_dict()["methods"]["research"]["do"]["with"]["agents"][
"researcher"
]["role"]
== "Researcher"
)
def test_crew_action_json_schema_describes_inline_crew_definitions():
schema_defs = FlowDefinition.json_schema()["$defs"]
agents_schema = schema_defs["CrewDefinition"]["properties"]["agents"]
assert set(schema_defs["CrewDefinition"]["properties"]) >= {
"agents",
"tasks",
"inputs",
}
assert {option["type"] for option in agents_schema["anyOf"]} == {"array", "object"}
assert set(schema_defs["CrewAgentDefinition"]["properties"]) >= {
"role",
"goal",
"backstory",
"settings",
}
assert set(schema_defs["CrewTaskDefinition"]["properties"]) >= {
"description",
"expected_output",
"agent",
"context",
}
def test_crew_action_rejects_incomplete_inline_agent_definition():
with pytest.raises(ValidationError, match="goal"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
"methods": {
"research": {
"start": True,
"do": {
"call": "crew",
"with": {
"agents": {
"researcher": {
"role": "Researcher",
"backstory": "Knows things.",
}
},
"tasks": [
{
"description": "Research",
"expected_output": "Findings",
"agent": "researcher",
}
],
},
},
}
},
}
)
def test_crew_action_rejects_ref():
with pytest.raises(ValidationError, match="ref"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
"methods": {
"research": {
"start": True,
"do": {
"call": "crew",
"ref": "project.crew:build_crew",
"with": {"inputs": {"topic": "AI"}},
},
}
},
}
)
def test_crew_action_rejects_non_mapping_inputs_in_definition():
with pytest.raises(ValidationError, match="crew.inputs must be a mapping"):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
"methods": {
"research": {
"start": True,
"do": {
"call": "crew",
"with": {
"agents": {
"researcher": {
"role": "Researcher",
"goal": "Research",
"backstory": "Knows things.",
}
},
"tasks": [
{
"description": "Research",
"expected_output": "Findings",
"agent": "researcher",
}
],
"inputs": "topic",
},
},
}
},
}
)
def test_tool_action_reports_invalid_cel_expression():
yaml_str = f"""
schema: crewai.flow/v1

View File

@@ -21,10 +21,6 @@ class Person(BaseModel):
skills: List[str]
class Container(BaseModel):
payload: BaseModel | None = None
@dataclass
class DataclassPerson:
name: str
@@ -118,16 +114,6 @@ def test_pydantic_model_serialization():
)
def test_polymorphic_field_serializes_concrete_subclass():
container = Container(
payload=Address(street="1 Main", city="Tech City", country="Pythonia")
)
assert to_serializable(container) == {
"payload": {"street": "1 Main", "city": "Tech City", "country": "Pythonia"}
}
def test_dataclass_serialization_recurses_into_nested_values():
person = DataclassPerson(
name="Ada",