Compare commits

..

3 Commits

Author SHA1 Message Date
Vinicius Brasil
b9586874b3 Add lead scoring FlowDefinition example 2026-06-16 10:54:34 -07:00
João Moura
e9d568dc69 Deep Crew / Agent / Task attributes support on json (#6172)
Some checks failed
Build uv cache / build-cache (3.10) (push) Waiting to run
Build uv cache / build-cache (3.11) (push) Waiting to run
Build uv cache / build-cache (3.12) (push) Waiting to run
Build uv cache / build-cache (3.13) (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
* Enhance JSON crew project handling and validation

- Updated `create_json_crew.py` to specify input files with a brief path.
- Refactored `crew_loader.py` to improve agent and task loading logic, including the introduction of a `build_agent` function and better handling of task classes.
- Enhanced `json_loader.py` with additional validation for agent and task definitions, including support for Python references and conditional tasks.
- Added tests in `test_crew_loader.py` and `test_json_loader.py` to ensure proper loading of agents, tasks, and validation of project structures, including custom types and conditional tasks.
- Improved error handling and validation safety across the project loading process.

* Enhance JSON crew configuration options in create_json_crew.py

- Added optional fields for custom agent subclasses and advanced task options, including condition checks and output specifications.
- Improved documentation comments for better clarity on agent and task configurations.
- Updated JSON crew handling to support additional callbacks for pre- and post-execution processes.

* Enhance JSON crew template tests in test_create_crew.py

- Added assertions for new optional fields in crew and agent templates, including conditional tasks, custom converters, and input file specifications.
- Improved validation checks for manager agents and callback references to ensure proper configuration in JSON crew definitions.
- Expanded documentation references within the tests to provide clearer guidance on the expected structure and usage of crew templates.

* Fix JSON crew PR review issues
2026-06-16 02:00:19 -03:00
Vinicius Brasil
fe2c236601 Add each composite action to FlowDefinition (#6164)
Lets a definition loop over an array without writing Python. Each
iteration exposes `item` and prior steps `outputs`.

```yaml
do:
  call: each
  in: state.rows
  do:
    - normalize:
        call: tool
        ref: my_tools:NormalizeRowTool
        with: { row: "${ item }" }
    - lead_scoring:
        call: agent
        # ...
```
2026-06-15 21:44:33 -07:00
20 changed files with 2150 additions and 220 deletions

View File

@@ -705,6 +705,9 @@ def _agent_to_jsonc(agent: dict[str, Any]) -> str:
// Example: "role": "Senior {{industry}} Researcher"
"role": {json.dumps(agent["role"])},
// Optional custom Agent subclass
// "type": {{"python": "my_project.agents.CustomAgent"}},
// The agent's primary objective
"goal": {json.dumps(agent["goal"])},
@@ -728,7 +731,9 @@ def _agent_to_jsonc(agent: dict[str, Any]) -> str:
// Optional agent-level guardrail — validates this agent's final output.
// String guardrails are checked by an LLM and can reject/retry output.
// Python refs must point to module-level functions/classes in trusted code.
// "guardrail": "Only answer with information supported by retrieved evidence.",
// "step_callback": {{"python": "my_project.callbacks.on_agent_step"}},
// "guardrail_max_retries": 2,
// Advanced agent options:
@@ -786,11 +791,20 @@ def _task_to_json_fragment(task: dict[str, Any]) -> str:
lines.append("")
lines.append(" // Advanced task options:")
lines.append(" // Docs: https://docs.crewai.com/concepts/tasks")
lines.append(' // "output_json": null,')
lines.append(' // "type": "ConditionalTask",')
lines.append(
' // "condition": { "python": "my_project.conditions.should_run" },'
)
lines.append(
' // "output_json": { "python": "my_project.models.ReportOutput" },'
)
lines.append(' // "output_pydantic": null,')
lines.append(' // "response_model": null,')
lines.append(
' // "converter_cls": { "python": "my_project.converters.CustomConverter" },'
)
lines.append(' // "markdown": false,')
lines.append(' // "input_files": [],')
lines.append(' // "input_files": { "brief": "data/brief.txt" },')
lines.append(' // "security_config": {},')
lines.append("")
lines.append(" // Which agent handles this task")
@@ -874,7 +888,11 @@ def _crew_to_jsonc(
// Advanced crew options:
// Docs: https://docs.crewai.com/concepts/crews
// For hierarchical crews, manager_agent can reference an agents/<name>.jsonc file
// that is not included in the "agents" list.
// "manager_agent": "{agents[0]["name"]}",
// "before_kickoff_callbacks": [{{"python": "my_project.callbacks.before_kickoff"}}],
// "after_kickoff_callbacks": [{{"python": "my_project.callbacks.after_kickoff"}}],
// "function_calling_llm": "openai/gpt-4o-mini",
// "max_rpm": null,
// "cache": true,

View File

@@ -721,9 +721,30 @@ def test_json_create_provider_preselects_default_model(tmp_path, monkeypatch):
assert '"guardrail_max_retries": 2' in crew_template
assert "Docs: https://docs.crewai.com/concepts/tasks" in crew_template
assert '"output_pydantic": null' in crew_template
assert '"type": "ConditionalTask"' in crew_template
assert '"condition": { "python": "my_project.conditions.should_run" }' in (
crew_template
)
assert '"output_json": { "python": "my_project.models.ReportOutput" }' in (
crew_template
)
assert (
'"converter_cls": { "python": "my_project.converters.CustomConverter" }'
in crew_template
)
assert '"markdown": false' in crew_template
assert '"input_files": { "brief": "data/brief.txt" }' in crew_template
assert "Docs: https://docs.crewai.com/concepts/crews" in crew_template
assert "manager_agent can reference an agents/<name>.jsonc file" in crew_template
assert '"manager_agent": "researcher"' in crew_template
assert (
'"before_kickoff_callbacks": [{"python": '
'"my_project.callbacks.before_kickoff"}]'
) in crew_template
assert (
'"after_kickoff_callbacks": [{"python": '
'"my_project.callbacks.after_kickoff"}]'
) in crew_template
assert '"output_log_file": "crew.log"' in crew_template
assert "Crew-level LLM fields also accept object form" in crew_template
assert '"chat_llm": {"model": "llama3", "provider": "ollama"' in (
@@ -740,7 +761,13 @@ def test_json_create_provider_preselects_default_model(tmp_path, monkeypatch):
agent_template
)
assert '"role": "Senior {industry} Researcher"' in agent_template
assert '"type": {"python": "my_project.agents.CustomAgent"}' in agent_template
assert "Optional agent-level guardrail" in agent_template
assert "Python refs must point to module-level functions/classes" in agent_template
assert (
'"step_callback": {"python": "my_project.callbacks.on_agent_step"}'
in agent_template
)
assert '"guardrail_max_retries": 2' in agent_template
assert "Docs: https://docs.crewai.com/concepts/agents" in agent_template
assert '"reasoning": true' in agent_template

View File

@@ -0,0 +1,23 @@
import logging
from typing import Literal
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
logger = logging.getLogger("lead_flow")
class LogLeadInput(BaseModel):
message: str = Field(description="The message to log.")
level: Literal["debug", "info", "warning", "error"] = "info"
class LogLeadTool(BaseTool):
name: str = "log_lead"
description: str = "Log a message about a lead that was not pursued."
args_schema: type[BaseModel] = LogLeadInput
def _run(self, message: str, level: str = "info") -> str:
logger.log(logging.getLevelName(level.upper()), message)
return message

View File

@@ -0,0 +1,98 @@
# uv run --project lib/crewai crewai run --definition lib/crewai/examples/flows/lead_scoring_flow.yaml --inputs '{"lead":{"name":"Dana Lee","company":"Acme","employees":1200}}'
# uv run --project lib/crewai crewai run --definition lib/crewai/examples/flows/lead_scoring_flow.yaml --inputs '{"lead":{"name":"Sam Poe","company":"Tiny LLC","employees":3}}'
schema: crewai.flow/v1
name: LeadScoringFlow
description: Score an inbound lead, then route high-scoring leads to outreach and the rest to a log tool.
state:
type: dict
default:
lead: {}
methods:
score_lead:
start: true
do:
call: crew
with:
name: lead_scoring_crew
verbose: true
agents:
scorer:
role: Lead Qualification Analyst
goal: Assign a 0-100 fit score to inbound lead {name} from {company}
backstory: >
A revenue-ops veteran who scores leads against a clear ideal
customer profile: company size is the dominant signal.
tasks:
- name: score_lead_task
agent: scorer
description: >
Evaluate the inbound lead {name} from {company} ({employees}
employees) against this rubric, where company size dominates:
1000+ employees scores 85-100 (hot), 200-999 scores 70-84 (warm),
and under 200 scores 0-69 (cold). Return an integer score with a
one-line rationale.
expected_output: >
A LeadScore with an integer `score` (0-100), a short `reasoning`,
and a `tier` of "hot", "warm", or "cold".
output_pydantic:
type: object
properties:
score:
type: integer
reasoning:
type: string
tier:
type: string
enum: [hot, warm, cold]
required: [score, reasoning, tier]
inputs:
name: "${state.lead.name}"
company: "${state.lead.company}"
employees: "${state.lead.employees}"
route_by_score:
listen: score_lead
router: true
emit: [qualified, unqualified]
do:
call: expression
expr: "outputs.score_lead.pydantic.score >= 80 ? 'qualified' : 'unqualified'"
run_outreach:
listen: qualified
do:
call: crew
with:
name: outreach_crew
verbose: true
agents:
sdr:
role: Outbound SDR
goal: Draft a tailored first-touch email to {name} at {company}
backstory: >
A top-performing SDR who writes concise, personalized outreach
that earns replies from busy buyers.
tasks:
- name: draft_outreach_task
agent: sdr
description: >
Write a short, personalized first-touch email to {name} at
{company}. Ground the hook in this qualification rationale:
"{reasoning}".
expected_output: A ready-to-send outreach email with a subject line and body.
inputs:
name: "${state.lead.name}"
company: "${state.lead.company}"
reasoning: "${outputs.score_lead.pydantic.reasoning}"
log_unqualified:
listen: unqualified
do:
call: tool
ref: lead_flow.tools:LogLeadTool
with:
message: "${'Skipped low-fit lead ' + state.lead.name + ' (score ' + string(outputs.score_lead.pydantic.score) + ')'}"
level: info

View File

@@ -11,9 +11,17 @@ from __future__ import annotations
import json
import logging
import re
from typing import Any, Literal as TypingLiteral
from pydantic import BaseModel, ConfigDict, Field, field_serializer, model_validator
from pydantic import (
BaseModel,
ConfigDict,
Field,
RootModel,
field_serializer,
model_validator,
)
import yaml
from crewai.flow.conversational_definition import (
@@ -25,6 +33,7 @@ from crewai.flow.conversational_definition import (
logger = logging.getLogger(__name__)
FlowDefinitionCondition = str | dict[str, Any]
_STEP_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
__all__ = [
"FlowActionDefinition",
@@ -35,6 +44,8 @@ __all__ = [
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowMethodDefinition",
@@ -148,10 +159,11 @@ class FlowHumanFeedbackDefinition(BaseModel):
class FlowCodeActionDefinition(BaseModel):
"""A Flow method action that executes importable Python code."""
model_config = ConfigDict(extra="forbid")
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: TypingLiteral["code"] = "code"
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
class FlowToolActionDefinition(BaseModel):
@@ -173,14 +185,66 @@ class FlowExpressionActionDefinition(BaseModel):
expr: str
FlowActionDefinition = (
FlowInnerActionDefinition = (
FlowCodeActionDefinition | FlowToolActionDefinition | FlowExpressionActionDefinition
)
class FlowEachInnerActionDefinition(RootModel[dict[str, FlowInnerActionDefinition]]):
"""One named action inside an ``each`` composite action."""
@model_validator(mode="after")
def _validate_action_mapping(self) -> FlowEachInnerActionDefinition:
if len(self.root) != 1:
raise ValueError("each.do entries must be one-key mappings")
_validate_step_name(self.name, field="each.do action names")
return self
@property
def name(self) -> str:
return next(iter(self.root))
@property
def action(self) -> FlowInnerActionDefinition:
return next(iter(self.root.values()))
class FlowEachActionDefinition(BaseModel):
"""A composite action that runs a sequential mini-pipeline for each item."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: TypingLiteral["each"]
in_: str = Field(alias="in")
do: list[FlowEachInnerActionDefinition]
@model_validator(mode="after")
def _validate_inner_action_list(self) -> FlowEachActionDefinition:
if not self.do:
raise ValueError("each.do must contain at least one action")
seen: set[str] = set()
for inner_action in self.do:
name = inner_action.name
if name in seen:
raise ValueError(f"each.do action names must be unique: {name!r}")
seen.add(name)
return self
FlowActionDefinition = (
FlowCodeActionDefinition
| FlowToolActionDefinition
| FlowExpressionActionDefinition
| FlowEachActionDefinition
)
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""
description: str | None = None
do: FlowActionDefinition
start: bool | FlowDefinitionCondition | None = None
listen: FlowDefinitionCondition | None = None
@@ -227,6 +291,12 @@ class FlowDefinition(BaseModel):
methods: dict[str, FlowMethodDefinition] = Field(default_factory=dict)
diagnostics: list[FlowDefinitionDiagnostic] = Field(default_factory=list)
@model_validator(mode="after")
def _validate_method_names(self) -> FlowDefinition:
for method_name in self.methods:
_validate_step_name(method_name, field="Flow method names")
return self
def to_dict(self, *, exclude_none: bool = True) -> dict[str, Any]:
"""Serialize the definition to a JSON/YAML-ready dictionary."""
return self.model_dump(by_alias=True, exclude_none=exclude_none, mode="json")
@@ -369,6 +439,11 @@ def _deserialize_diagnostics(value: Any) -> list[FlowDefinitionDiagnostic]:
return [FlowDefinitionDiagnostic.model_validate(item) for item in value or []]
def _validate_step_name(name: str, *, field: str) -> None:
if not isinstance(name, str) or not _STEP_NAME_PATTERN.fullmatch(name):
raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}")
def _merge_diagnostics(
*diagnostic_groups: list[FlowDefinitionDiagnostic],
) -> list[FlowDefinitionDiagnostic]:

View File

@@ -121,11 +121,8 @@ from crewai.flow.human_feedback import (
)
from crewai.flow.input_provider import InputProvider
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.runtime._resolvers import (
resolve_action,
resolve_instance_ref,
resolve_ref,
)
from crewai.flow.runtime._actions import build_action
from crewai.flow.runtime._refs import resolve_instance_ref, resolve_ref
from crewai.flow.types import (
FlowExecutionData,
FlowMethodName,
@@ -1092,9 +1089,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self._methods.update(methods)
def _action_bound_methods(self) -> dict[FlowMethodName, Callable[..., Any]]:
def resolve(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
def build(name: str, definition: FlowMethodDefinition) -> Callable[..., Any]:
try:
return resolve_action(self, definition.do)
return build_action(self, definition.do)
except Exception as e:
unresolved.append(f"{name}: {e}")
return lambda *args, **kwargs: None
@@ -1102,9 +1099,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
methods: dict[FlowMethodName, Callable[..., Any]] = {}
unresolved: list[str] = []
for method_name, method_definition in self._definition.methods.items():
methods[FlowMethodName(method_name)] = resolve(
method_name, method_definition
)
methods[FlowMethodName(method_name)] = build(method_name, method_definition)
if unresolved:
raise ValueError(
f"Cannot build flow {self._definition.name!r} from its definition; "

View File

@@ -0,0 +1,221 @@
"""Build FlowDefinition actions into live runtime callables."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
import contextvars
import inspect
from typing import TYPE_CHECKING, Any, Protocol, cast
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowCodeActionDefinition,
FlowEachActionDefinition,
FlowEachInnerActionDefinition,
FlowExpressionActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
from crewai.flow.runtime._refs import InvalidRefError, resolve_ref
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
__all__ = ["build_action"]
LocalContext = dict[str, Any]
_LOCAL_CONTEXT_KWARG = "__flow_definition_local_context"
class _BuiltAction(Protocol):
def run(self, *args: Any, **kwargs: Any) -> Any: ...
class _ActionType(Protocol):
definition_type: type[Any]
def __call__(self, flow: Flow[Any], definition: Any) -> _BuiltAction: ...
class CodeAction:
definition_type = FlowCodeActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowCodeActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.handler = self._resolve_handler()
self.signature = inspect.signature(self.handler)
def run(self, *args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
if self.definition.with_ is None:
return self.handler(*args, **kwargs)
return self.handler(
**render_with_block(
self.flow, self.definition.with_, local_context=local_context
)
)
def _resolve_handler(self) -> Callable[..., Any]:
ref = self.definition.ref
target = resolve_ref(ref, field="do")
if not callable(target):
raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable")
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None and hasattr(handler, "__get__"):
handler = handler.__get__(self.flow, type(self.flow))
return handler
class ToolAction:
definition_type = FlowToolActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowToolActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.tool = self._build_tool()
self.kwargs = definition.with_ or {}
def run(self, *_args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return self.tool.run(
**render_with_block(self.flow, self.kwargs, local_context=local_context)
)
def _build_tool(self) -> Any:
target = resolve_ref(self.definition.ref, field="do")
from crewai.tools import BaseTool
if not (inspect.isclass(target) and issubclass(target, BaseTool)):
raise InvalidRefError(
f"invalid tool ref {self.definition.ref!r}; expected a BaseTool class"
)
try:
tool_cls = cast(Callable[[], BaseTool], target)
return tool_cls()
except Exception as e:
raise InvalidRefError(
f"cannot instantiate tool ref {self.definition.ref!r} "
f"without arguments: {e}"
) from e
class ExpressionAction:
definition_type = FlowExpressionActionDefinition
def __init__(
self, flow: Flow[Any], definition: FlowExpressionActionDefinition
) -> None:
self.flow = flow
self.definition = definition
def run(self, *_args: Any, **kwargs: Any) -> Any:
local_context = _pop_local_context(kwargs)
return evaluate_expression(
self.flow, self.definition.expr, local_context=local_context
)
class EachAction:
definition_type = FlowEachActionDefinition
def __init__(self, flow: Flow[Any], definition: FlowEachActionDefinition) -> None:
self.flow = flow
self.definition = definition
self.inner_actions = [
(inner_action.name, self._build_inner_action(inner_action))
for inner_action in definition.do
]
async def run(self, *_args: Any, **_kwargs: Any) -> list[Any]:
items = evaluate_expression(self.flow, self.definition.in_)
if not isinstance(items, list):
raise ValueError("each.in must evaluate to an array")
results: list[Any] = []
for item in items:
local_outputs: dict[str, Any] = {}
last_output: Any = None
for name, run_inner_action in self.inner_actions:
last_output = await run_inner_action(
{"item": item, "outputs": local_outputs}
)
local_outputs[name] = last_output
results.append(last_output)
return results
def _build_inner_action(
self, inner_action: FlowEachInnerActionDefinition
) -> Callable[[LocalContext], Any]:
run_action = build_action(self.flow, inner_action.action)
async def run_inner_action(local_context: LocalContext) -> Any:
kwargs = {_LOCAL_CONTEXT_KWARG: local_context}
if inspect.iscoroutinefunction(run_action):
result = run_action(**kwargs)
else:
ctx = contextvars.copy_context()
def run_with_context() -> Any:
return run_action(**kwargs)
result = await asyncio.to_thread(ctx.run, run_with_context)
if inspect.isawaitable(result):
result = await result
return result
return run_inner_action
_ACTION_TYPES: tuple[_ActionType, ...] = (
EachAction,
CodeAction,
ToolAction,
ExpressionAction,
)
def build_action(
flow: Flow[Any], definition: FlowActionDefinition
) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
for action_type in _ACTION_TYPES:
if isinstance(definition, action_type.definition_type):
return _as_flow_method(action_type(flow, definition))
raise ValueError(f"unknown call type {getattr(definition, 'call', None)!r}")
def _as_flow_method(action: _BuiltAction) -> Callable[..., Any]:
run: Callable[..., Any]
if inspect.iscoroutinefunction(action.run):
async def run_async(*args: Any, **kwargs: Any) -> Any:
return await action.run(*args, **kwargs)
run = run_async
else:
def run_sync(*args: Any, **kwargs: Any) -> Any:
return action.run(*args, **kwargs)
run = run_sync
signature = getattr(action, "signature", None)
if signature is not None:
object.__setattr__(run, "__signature__", signature)
return run
def _pop_local_context(kwargs: dict[str, Any]) -> LocalContext | None:
local_context = kwargs.pop(_LOCAL_CONTEXT_KWARG, None)
if local_context is None:
return None
if not isinstance(local_context, dict):
raise TypeError("flow definition local context must be a mapping")
return cast(LocalContext, local_context)

View File

@@ -2,14 +2,12 @@
from __future__ import annotations
import copy
import dataclasses
from itertools import pairwise
import json
import re
from typing import TYPE_CHECKING, Any, cast
from pydantic import BaseModel
from crewai.utilities.serialization import to_serializable
if TYPE_CHECKING:
@@ -25,25 +23,45 @@ class FlowExpressionError(ValueError):
"""A FlowDefinition expression failed to parse or evaluate."""
def render_with_block(flow: Flow[Any], value: Any) -> Any:
def render_with_block(
flow: Flow[Any], value: Any, local_context: dict[str, Any] | None = None
) -> Any:
"""Render CEL expressions inside a FlowDefinition ``with:`` payload."""
context = _expression_context(flow)
context = _expression_context(flow, local_context=local_context)
return _render_value(value, context)
def evaluate_expression(flow: Flow[Any], expression: str) -> Any:
def evaluate_expression(
flow: Flow[Any], expression: str, local_context: dict[str, Any] | None = None
) -> Any:
"""Evaluate a FlowDefinition CEL expression against runtime context."""
expression = expression.strip()
if not expression:
raise FlowExpressionError("empty CEL expression")
return _eval_cel(expression, _expression_context(flow))
return _eval_cel(expression, _expression_context(flow, local_context=local_context))
def _expression_context(flow: Flow[Any]) -> dict[str, Any]:
return {
def _expression_context(
flow: Flow[Any], local_context: dict[str, Any] | None = None
) -> dict[str, Any]:
outputs = _outputs_by_name(flow._method_outputs)
context: dict[str, Any] = {
"state": flow._copy_and_serialize_state(),
"outputs": _outputs_by_name(flow._method_outputs),
"outputs": outputs,
}
if local_context:
local_values = {
key: to_serializable(value, max_depth=0)
for key, value in local_context.items()
}
local_outputs = local_values.pop("outputs", None)
local_values.pop("state", None)
context.update(local_values)
if local_outputs is not None:
if not isinstance(local_outputs, dict):
raise TypeError("flow definition local outputs must be a mapping")
context["outputs"] = {**outputs, **local_outputs}
return context
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
@@ -54,12 +72,7 @@ def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
if isinstance(entry, dict) and "output" in entry:
method = str(entry.get("method", ""))
output = entry["output"]
output = copy.deepcopy(output)
if isinstance(output, BaseModel):
output = output.model_dump(mode="json")
elif dataclasses.is_dataclass(output) and not isinstance(output, type):
output = dataclasses.asdict(output)
outputs[method] = output
outputs[method] = to_serializable(output, max_depth=0)
return outputs

View File

@@ -0,0 +1,38 @@
"""Resolution of ``module:qualname`` refs into live Python objects."""
from __future__ import annotations
import importlib
import inspect
from operator import attrgetter
from typing import Any
class InvalidRefError(ValueError):
"""A definition ref that cannot be resolved to a live object."""
def resolve_ref(ref: str, *, field: str) -> Any:
"""Import the object a definition's `module:qualname` ref points to."""
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidRefError(
f"invalid {field} ref {ref!r}; expected 'module:qualname'"
)
try:
return attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e
def resolve_instance_ref(ref: str, *, field: str) -> Any:
"""Resolve a ref, auto-instantiating a no-arg class into an instance."""
target = resolve_ref(ref, field=field)
if not inspect.isclass(target):
return target
try:
return target()
except Exception as e:
raise InvalidRefError(
f"cannot instantiate {field} ref {ref!r} without arguments: {e}"
) from e

View File

@@ -1,116 +0,0 @@
"""Resolution of FlowDefinition refs (``module:qualname``) into live objects.
Every ref-shaped value in a definition — ``do`` actions, ``state.ref``,
``config.input_provider``, ``human_feedback.provider`` — resolves through
:func:`resolve_ref`. Failures are loud and name the field and the ref.
"""
from __future__ import annotations
from collections.abc import Callable
import importlib
import inspect
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowCodeActionDefinition,
FlowExpressionActionDefinition,
FlowToolActionDefinition,
)
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
if TYPE_CHECKING:
from crewai.flow.runtime import Flow
class InvalidRefError(ValueError):
"""A definition ref that cannot be resolved to a live object."""
def resolve_ref(ref: str, *, field: str) -> Any:
"""Import the object a definition's `module:qualname` ref points to."""
module_name, _, qualname = ref.partition(":")
if "<" in ref or not module_name or not qualname:
raise InvalidRefError(
f"invalid {field} ref {ref!r}; expected 'module:qualname'"
)
try:
return attrgetter(qualname)(importlib.import_module(module_name))
except (ImportError, AttributeError) as e:
raise InvalidRefError(f"unresolvable {field} ref {ref!r}") from e
def resolve_instance_ref(ref: str, *, field: str) -> Any:
"""Resolve a ref, auto-instantiating a no-arg class into an instance."""
target = resolve_ref(ref, field=field)
if not inspect.isclass(target):
return target
try:
return target()
except Exception as e:
raise InvalidRefError(
f"cannot instantiate {field} ref {ref!r} without arguments: {e}"
) from e
def _resolve_code_action(
flow: Flow[Any], action: FlowCodeActionDefinition
) -> Callable[..., Any]:
ref = action.ref
target = resolve_ref(ref, field="do")
if not callable(target):
raise InvalidRefError(f"invalid do ref {ref!r}; object is not callable")
handler = cast(Callable[..., Any], target)
if getattr(handler, "__self__", None) is None:
handler = handler.__get__(flow, type(flow))
return handler
def _resolve_tool_action(
flow: Flow[Any], action: FlowToolActionDefinition
) -> Callable[..., Any]:
target = resolve_ref(action.ref, field="do")
from crewai.tools import BaseTool
if not (inspect.isclass(target) and issubclass(target, BaseTool)):
raise InvalidRefError(
f"invalid tool ref {action.ref!r}; expected a BaseTool class"
)
try:
tool_cls = cast(Callable[[], BaseTool], target)
tool = tool_cls()
except Exception as e:
raise InvalidRefError(
f"cannot instantiate tool ref {action.ref!r} without arguments: {e}"
) from e
tool_kwargs = action.with_ or {}
def run_tool(*_args: Any, **_kwargs: Any) -> Any:
return tool.run(**render_with_block(flow, tool_kwargs))
return run_tool
def _resolve_expression_action(
flow: Flow[Any], action: FlowExpressionActionDefinition
) -> Callable[..., Any]:
def run_expression(*_args: Any, **_kwargs: Any) -> Any:
return evaluate_expression(flow, action.expr)
return run_expression
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
if action.call == "tool":
return _resolve_tool_action(flow, action)
if action.call == "expression":
return _resolve_expression_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -11,6 +11,7 @@ from crewai.project.json_loader import (
JSONProjectError,
JSONProjectValidationError,
_crew_kwargs_from_definition,
_task_class_from_definition,
_task_kwargs_from_definition,
load_json_crew_project,
)
@@ -26,16 +27,14 @@ def load_crew(
default inputs. Agent definitions are resolved from individual
``<name>.jsonc`` / ``<name>.json`` files inside an ``agents/`` directory.
"""
from crewai import Agent, Crew, Task
from crewai import Crew, Task
crew_path = Path(source)
project = load_json_crew_project(crew_path, agents_dir=agents_dir)
agents_map: dict[str, Any] = {}
for name in project.agent_names:
agent_def = project.agents[name]
def build_agent(agent_def: Any) -> Any:
try:
agents_map[name] = Agent(**agent_def.kwargs)
return agent_def.agent_class(**agent_def.kwargs)
except ValidationError as exc:
raise JSONProjectError(
f"{agent_def.path}: validation failed: {exc}"
@@ -45,11 +44,16 @@ def load_crew(
f"{agent_def.path}: failed to load agent: {exc}"
) from exc
agents_map: dict[str, Any] = {}
for name, agent_def in project.agents.items():
agents_map[name] = build_agent(agent_def)
tasks_list: list[Task] = []
task_name_map: dict[str, Task] = {}
for index, task_defn in enumerate(project.task_definitions):
source_label = f"{crew_path}: tasks[{index}]"
task_class = _task_class_from_definition(task_defn, f"{source_label}: type")
task_kwargs = _task_kwargs_from_definition(
task_defn,
agents_map=agents_map,
@@ -58,9 +62,13 @@ def load_crew(
project_root=crew_path.parent,
)
try:
task = Task(**task_kwargs)
task = task_class(**task_kwargs)
except ValidationError as exc:
raise JSONProjectError(f"{source_label}: validation failed: {exc}") from exc
except Exception as exc:
raise JSONProjectError(
f"{source_label}: failed to load task: {exc}"
) from exc
tasks_list.append(task)
task_name = task_defn.get("name")
@@ -69,7 +77,7 @@ def load_crew(
crew_kwargs = _crew_kwargs_from_definition(
project.definition,
agents=list(agents_map.values()),
agents=[agents_map[name] for name in project.agent_names],
tasks=tasks_list,
agents_map=agents_map,
source=crew_path,

View File

@@ -7,9 +7,9 @@ import json
import logging
from pathlib import Path
import re
from typing import Any
from typing import Any, cast
from pydantic import ValidationError
from pydantic import BaseModel, ValidationError
logger = logging.getLogger(__name__)
@@ -71,6 +71,66 @@ _CREW_RUNTIME_FIELDS = {
JSON_PROJECT_EXTENSIONS = (".jsonc", ".json")
PYTHON_REF_KEY = "python"
_AGENT_TYPE_ALIASES = {
"agent",
"Agent",
"crewai.Agent",
"crewai.agent.core.Agent",
}
_TASK_TYPE_ALIASES = {
"task",
"Task",
"crewai.Task",
"crewai.task.Task",
}
_CONDITIONAL_TASK_TYPE_ALIASES = {
"conditional",
"conditional_task",
"ConditionalTask",
"crewai.tasks.conditional_task.ConditionalTask",
}
_URI_RE = re.compile(r"^[A-Za-z][A-Za-z0-9+.-]*:")
_AGENT_CALLABLE_FIELDS = {"guardrail", "step_callback"}
_AGENT_CALLABLE_LIST_FIELDS = {"callbacks"}
_TASK_CALLABLE_FIELDS = {"callback", "condition", "guardrail"}
_TASK_CALLABLE_LIST_FIELDS = {"guardrails"}
_TASK_MODEL_CLASS_FIELDS = {"output_json", "output_pydantic", "response_model"}
_CREW_CALLABLE_FIELDS = {"step_callback", "task_callback"}
_CREW_CALLABLE_LIST_FIELDS = {"before_kickoff_callbacks", "after_kickoff_callbacks"}
_AGENT_OBJECT_REF_FIELDS = {
"agent_executor",
"checkpoint",
"embedder",
"function_calling_llm",
"i18n",
"knowledge",
"knowledge_config",
"knowledge_sources",
"knowledge_storage",
"llm",
"memory",
"planning_config",
"security_config",
"skills",
}
_TASK_OBJECT_REF_FIELDS = {"security_config"}
_CREW_OBJECT_REF_FIELDS = {
"chat_llm",
"checkpoint",
"embedder",
"function_calling_llm",
"knowledge",
"knowledge_sources",
"manager_agent",
"manager_llm",
"memory",
"planning_llm",
"security_config",
"skills",
}
@dataclass(frozen=True)
@@ -81,6 +141,7 @@ class JSONAgentDefinition:
path: Path
definition: dict[str, Any]
kwargs: dict[str, Any]
agent_class: type[Any]
@dataclass(frozen=True)
@@ -136,15 +197,19 @@ def load_jsonc_file(source: str | Path) -> Any:
def load_agent(source: str | Path) -> Any:
"""Load an existing ``Agent`` from a ``.json`` / ``.jsonc`` definition file."""
from crewai import Agent
path = Path(source)
defn = _expect_object(load_jsonc_file(path), path)
root = path.parent.parent if path.parent.name == "agents" else Path.cwd()
agent_kwargs = _agent_kwargs_from_definition(defn, path, project_root=root)
agent_class = _agent_class_from_definition(defn, f"{path}: type")
agent_kwargs = _agent_kwargs_from_definition(
defn,
path,
agent_class=agent_class,
project_root=root,
)
try:
return Agent(**agent_kwargs)
return agent_class(**agent_kwargs)
except ValidationError as exc:
raise JSONProjectError(_format_validation_error(path, exc)) from exc
except Exception as exc:
@@ -207,6 +272,7 @@ def load_json_crew_project(
{"inputs"},
)
)
fail_many(_python_reference_definition_errors(defn, crew_path))
agent_names = defn.get("agents", [])
if not isinstance(agent_names, list) or not agent_names:
@@ -215,10 +281,13 @@ def load_json_crew_project(
agents_dir = Path(agents_dir)
agent_definitions: dict[str, JSONAgentDefinition] = {}
for agent_name in agent_names:
def load_agent_definition(agent_name: str) -> None:
if not isinstance(agent_name, str) or not agent_name:
fail(f"{crew_path}: each agent reference must be a non-empty string")
continue
return
if agent_name in agent_definitions:
return
agent_file = find_json_project_file(agents_dir, agent_name)
if agent_file is None:
message = (
@@ -232,46 +301,69 @@ def load_json_crew_project(
)
else:
raise FileNotFoundError(message)
continue
return
try:
agent_defn = _expect_object(load_jsonc_file(agent_file), agent_file)
agent_class = _agent_class_from_definition(
agent_defn,
f"{agent_file}: type",
resolve_python_refs=not collect_errors,
)
agent_kwargs = _agent_kwargs_from_definition(
agent_defn,
agent_file,
agent_class=agent_class,
# Validation must never execute project code (custom tools).
resolve_tools=not collect_errors,
resolve_python_refs=not collect_errors,
project_root=crew_path.parent,
)
except Exception as exc:
if collect_errors:
errors.append(str(exc))
continue
return
raise
agent_definitions[agent_name] = JSONAgentDefinition(
name=agent_name,
path=agent_file,
definition=agent_defn,
kwargs=agent_kwargs,
agent_class=agent_class,
)
for agent_name in agent_names:
load_agent_definition(agent_name)
manager_agent = defn.get("manager_agent")
if manager_agent is not None:
if isinstance(manager_agent, str) and manager_agent:
load_agent_definition(manager_agent)
elif _is_python_ref(manager_agent):
pass
else:
fail(
f"{crew_path}: 'manager_agent' must be an agent definition name "
f'or a {{"{PYTHON_REF_KEY}": "module.agent"}} reference'
)
known_agents = set(agent_definitions)
task_defs = defn.get("tasks", [])
if not isinstance(task_defs, list) or not task_defs:
fail(f"{crew_path}: 'tasks' must be a non-empty list")
task_defs = []
known_tasks: set[str] = set()
known_agents = {name for name in agent_names if isinstance(name, str)}
for index, task_defn in enumerate(task_defs):
task_path = f"{crew_path}: tasks[{index}]"
if not isinstance(task_defn, dict):
fail(f"{task_path} must be an object")
continue
fail_many(
_field_errors(
_task_definition_errors(
task_defn,
_task_allowed_fields(),
_TASK_RUNTIME_FIELDS,
task_path,
resolve_python_refs=not collect_errors,
)
)
missing_required = [
@@ -284,7 +376,8 @@ def load_json_crew_project(
agent_ref = task_defn.get("agent")
if agent_ref is not None and agent_ref not in known_agents:
fail(
f"{task_path} references agent '{agent_ref}' which is not in the crew agents list"
f"{task_path} references agent '{agent_ref}' which does not match "
"a loaded agent definition"
)
fail_many(
@@ -422,19 +515,180 @@ def _expect_object(value: Any, source: str | Path) -> dict[str, Any]:
return value
def _is_python_ref(value: Any) -> bool:
return isinstance(value, dict) and PYTHON_REF_KEY in value
def _python_ref_errors(value: Any, source: str | Path) -> list[str]:
if not isinstance(value, dict):
return [
f"{source}: Python reference must be an object like "
f'{{"{PYTHON_REF_KEY}": "module.attribute"}}'
]
if set(value) != {PYTHON_REF_KEY}:
return [
f"{source}: Python reference objects must only contain '{PYTHON_REF_KEY}'"
]
path = value.get(PYTHON_REF_KEY)
if not isinstance(path, str) or not path.strip():
return [f"{source}: Python reference '{PYTHON_REF_KEY}' must be a string"]
if "." not in path:
return [
f"{source}: Python reference '{path}' must be a dotted import path "
"like 'module.attribute'"
]
return []
def _python_ref_path(value: Any, source: str | Path) -> str:
errors = _python_ref_errors(value, source)
if errors:
raise JSONProjectValidationError(errors)
path = cast(str, value[PYTHON_REF_KEY])
return path.strip()
def _resolve_python_ref(
value: Any,
source: str | Path,
*,
expected: str,
) -> Any:
from crewai.utilities.import_utils import import_and_validate_definition
path = _python_ref_path(value, source)
try:
resolved = import_and_validate_definition(path)
except Exception as exc:
raise JSONProjectError(f"{source}: failed to import '{path}': {exc}") from exc
if expected == "any":
return resolved
if expected == "callable" and not callable(resolved):
raise JSONProjectError(f"{source}: Python reference '{path}' is not callable")
if expected == "class" and not isinstance(resolved, type):
raise JSONProjectError(f"{source}: Python reference '{path}' is not a class")
return resolved
def _resolve_python_class(
value: Any,
source: str | Path,
*,
base_class: type[Any] | None = None,
) -> type[Any]:
cls = cast(type[Any], _resolve_python_ref(value, source, expected="class"))
if base_class is not None and not issubclass(cls, base_class):
raise JSONProjectError(
f"{source}: Python reference '{_python_ref_path(value, source)}' "
f"must be a subclass of {base_class.__module__}.{base_class.__name__}"
)
return cls
def _agent_class_from_definition(
defn: dict[str, Any],
source: str | Path,
*,
resolve_python_refs: bool = True,
) -> type[Any]:
from crewai import Agent
agent_class = cast(type[Any], Agent)
type_value = defn.get("type")
if type_value is None:
return agent_class
if isinstance(type_value, str) and type_value in _AGENT_TYPE_ALIASES:
return agent_class
if _is_python_ref(type_value):
if not resolve_python_refs:
errors = _python_ref_errors(type_value, source)
if errors:
raise JSONProjectValidationError(errors)
return agent_class
from crewai.agents.agent_builder.base_agent import BaseAgent
return _resolve_python_class(type_value, source, base_class=BaseAgent)
if isinstance(type_value, str):
raise JSONProjectError(
f"{source}: unsupported agent type '{type_value}'. Use 'Agent' or "
f'{{"{PYTHON_REF_KEY}": "module.CustomAgent"}}.'
)
raise JSONProjectValidationError(_python_ref_errors(type_value, source))
def _task_class_from_definition(
defn: dict[str, Any],
source: str | Path,
*,
resolve_python_refs: bool = True,
) -> type[Any]:
from crewai import Task
task_class = cast(type[Any], Task)
type_value = defn.get("type")
if type_value is None:
return task_class
if isinstance(type_value, str) and type_value in _TASK_TYPE_ALIASES:
return task_class
if isinstance(type_value, str) and type_value in _CONDITIONAL_TASK_TYPE_ALIASES:
from crewai.tasks.conditional_task import ConditionalTask
return cast(type[Any], ConditionalTask)
if _is_python_ref(type_value):
if not resolve_python_refs:
errors = _python_ref_errors(type_value, source)
if errors:
raise JSONProjectValidationError(errors)
return task_class
return _resolve_python_class(type_value, source, base_class=task_class)
if isinstance(type_value, str):
raise JSONProjectError(
f"{source}: unsupported task type '{type_value}'. Use 'Task', "
f"'ConditionalTask', or "
f'{{"{PYTHON_REF_KEY}": "module.CustomTask"}}.'
)
raise JSONProjectValidationError(_python_ref_errors(type_value, source))
def _model_fields_for(model_cls: type[Any], source: str | Path) -> set[str]:
fields = getattr(model_cls, "model_fields", None)
if not isinstance(fields, dict):
raise JSONProjectError(
f"{source}: {model_cls.__module__}.{model_cls.__name__} must be a "
"Pydantic model class"
)
return set(fields)
def _definition_has_python_type(defn: dict[str, Any]) -> bool:
return _is_python_ref(defn.get("type"))
def _agent_kwargs_from_definition(
defn: dict[str, Any],
path: Path | str,
*,
agent_class: type[Any] | None = None,
resolve_tools: bool = True,
resolve_python_refs: bool = True,
project_root: Path | None = None,
) -> dict[str, Any]:
agent_class = agent_class or _agent_class_from_definition(
defn,
f"{path}: type",
resolve_python_refs=resolve_python_refs,
)
allowed_fields = _agent_allowed_fields(agent_class)
extra_allowed = {"settings", "type"}
skip_unknown = _definition_has_python_type(defn) and not resolve_python_refs
errors = _field_errors(
defn,
_agent_allowed_fields(),
allowed_fields,
_AGENT_RUNTIME_FIELDS,
path,
{"settings"},
extra_allowed,
skip_unknown=skip_unknown,
)
for required in ("role", "goal", "backstory"):
if required not in defn:
@@ -450,21 +704,26 @@ def _agent_kwargs_from_definition(
errors.extend(
_field_errors(
settings,
_agent_allowed_fields(),
allowed_fields,
_AGENT_RUNTIME_FIELDS,
f"{path}: settings",
skip_unknown=skip_unknown,
)
)
errors.extend(_python_reference_definition_errors(defn, path))
if isinstance(settings, dict):
errors.extend(
_python_reference_definition_errors(settings, f"{path}: settings")
)
if errors:
raise JSONProjectValidationError(errors)
agent_kwargs = {
key: value for key, value in defn.items() if key in _agent_allowed_fields()
}
agent_kwargs = {key: value for key, value in defn.items() if key in allowed_fields}
agent_kwargs.update(settings)
if resolve_tools:
_resolve_tool_fields(agent_kwargs, project_root=project_root)
_resolve_agent_python_refs(agent_kwargs, path)
else:
# Validation/deploy mode: check tool declarations structurally without
# importing or instantiating anything — custom:<name> tools execute
@@ -484,24 +743,28 @@ def _task_kwargs_from_definition(
source: str,
project_root: Path | None = None,
) -> dict[str, Any]:
task_class = _task_class_from_definition(task_defn, f"{source}: type")
allowed_fields = _task_allowed_fields(task_class)
errors = _field_errors(
task_defn,
_task_allowed_fields(),
allowed_fields,
_TASK_RUNTIME_FIELDS,
source,
{"type"},
)
if errors:
raise JSONProjectValidationError(errors)
task_kwargs = {
key: value for key, value in task_defn.items() if key in _task_allowed_fields()
key: value for key, value in task_defn.items() if key in allowed_fields
}
agent_ref = task_kwargs.get("agent")
if agent_ref is not None and isinstance(agent_ref, str):
if agent_ref not in agents_map:
raise JSONProjectError(
f"{source} references agent '{agent_ref}' which is not in the crew agents list"
f"{source} references agent '{agent_ref}' which does not match "
"a loaded agent definition"
)
task_kwargs["agent"] = agents_map[agent_ref]
@@ -518,6 +781,13 @@ def _task_kwargs_from_definition(
task_kwargs["context"] = context_tasks
_resolve_tool_fields(task_kwargs, project_root=project_root)
_resolve_task_python_refs(task_kwargs, source)
if "input_files" in task_kwargs:
task_kwargs["input_files"] = _normalize_input_files(
task_kwargs["input_files"],
source,
project_root,
)
return task_kwargs
@@ -548,10 +818,12 @@ def _crew_kwargs_from_definition(
if isinstance(manager_agent, str):
if manager_agent not in agents_map:
raise JSONProjectError(
f"{source}: manager_agent '{manager_agent}' is not in the crew agents list"
f"{source}: manager_agent '{manager_agent}' does not match an agent definition"
)
crew_kwargs["manager_agent"] = agents_map[manager_agent]
_resolve_crew_python_refs(crew_kwargs, source)
return crew_kwargs
@@ -561,6 +833,8 @@ def _resolve_tool_fields(
tools = kwargs.get("tools")
if tools is not None:
kwargs["tools"] = _resolve_tools(tools, project_root=project_root)
if "mcps" in kwargs:
kwargs["mcps"] = _resolve_mcp_python_refs(kwargs["mcps"])
def _field_errors(
@@ -569,11 +843,17 @@ def _field_errors(
runtime_fields: set[str],
source: str | Path,
extra_allowed: set[str] | None = None,
*,
skip_unknown: bool = False,
) -> list[str]:
extra_allowed = extra_allowed or set()
keys = set(data)
runtime = sorted(keys & runtime_fields)
unknown = sorted(keys - allowed_fields - runtime_fields - extra_allowed)
unknown = (
[]
if skip_unknown
else sorted(keys - allowed_fields - runtime_fields - extra_allowed)
)
errors: list[str] = []
if runtime:
@@ -586,16 +866,16 @@ def _field_errors(
return errors
def _agent_allowed_fields() -> set[str]:
def _agent_allowed_fields(agent_class: type[Any] | None = None) -> set[str]:
from crewai import Agent
return set(Agent.model_fields) - _AGENT_RUNTIME_FIELDS
return _model_fields_for(agent_class or Agent, "agent type") - _AGENT_RUNTIME_FIELDS
def _task_allowed_fields() -> set[str]:
def _task_allowed_fields(task_class: type[Any] | None = None) -> set[str]:
from crewai import Task
return set(Task.model_fields) - _TASK_RUNTIME_FIELDS
return _model_fields_for(task_class or Task, "task type") - _TASK_RUNTIME_FIELDS
def _crew_allowed_fields() -> set[str]:
@@ -604,6 +884,417 @@ def _crew_allowed_fields() -> set[str]:
return set(Crew.model_fields) - _CREW_RUNTIME_FIELDS
def _task_definition_errors(
task_defn: dict[str, Any],
source: str | Path,
*,
resolve_python_refs: bool,
) -> list[str]:
skip_unknown = _definition_has_python_type(task_defn) and not resolve_python_refs
try:
task_class = _task_class_from_definition(
task_defn,
f"{source}: type",
resolve_python_refs=resolve_python_refs,
)
except JSONProjectValidationError as exc:
return exc.errors
except JSONProjectError as exc:
return [str(exc)]
errors = _field_errors(
task_defn,
_task_allowed_fields(task_class),
_TASK_RUNTIME_FIELDS,
source,
{"type"},
skip_unknown=skip_unknown,
)
errors.extend(_python_reference_definition_errors(task_defn, source))
return errors
def _python_reference_definition_errors(
defn: dict[str, Any],
source: str | Path,
) -> list[str]:
errors: list[str] = []
for field in (
_AGENT_CALLABLE_FIELDS
| _AGENT_CALLABLE_LIST_FIELDS
| _TASK_CALLABLE_FIELDS
| _TASK_CALLABLE_LIST_FIELDS
| _TASK_MODEL_CLASS_FIELDS
| _CREW_CALLABLE_FIELDS
| _CREW_CALLABLE_LIST_FIELDS
| {"converter_cls", "executor_class"}
):
if field not in defn:
continue
errors.extend(_python_reference_value_errors(defn[field], f"{source}: {field}"))
for field in (
_AGENT_OBJECT_REF_FIELDS | _TASK_OBJECT_REF_FIELDS | _CREW_OBJECT_REF_FIELDS
):
if field not in defn:
continue
errors.extend(
_python_reference_value_errors_recursive(defn[field], f"{source}: {field}")
)
errors.extend(
_embedder_python_ref_errors(defn.get("embedder"), f"{source}: embedder")
)
errors.extend(_a2a_python_ref_errors(defn.get("a2a"), f"{source}: a2a"))
errors.extend(_mcp_python_ref_errors(defn.get("mcps"), f"{source}: mcps"))
type_value = defn.get("type")
if _is_python_ref(type_value):
errors.extend(_python_ref_errors(type_value, f"{source}: type"))
return errors
def _python_reference_value_errors(value: Any, source: str | Path) -> list[str]:
errors: list[str] = []
if _is_python_ref(value):
return _python_ref_errors(value, source)
if isinstance(value, list):
for index, item in enumerate(value):
if _is_python_ref(item):
errors.extend(_python_ref_errors(item, f"{source}[{index}]"))
return errors
def _python_reference_value_errors_recursive(
value: Any, source: str | Path
) -> list[str]:
if _is_python_ref(value):
return _python_ref_errors(value, source)
errors: list[str] = []
if isinstance(value, list):
for index, item in enumerate(value):
errors.extend(
_python_reference_value_errors_recursive(item, f"{source}[{index}]")
)
elif isinstance(value, dict):
for key, item in value.items():
errors.extend(
_python_reference_value_errors_recursive(item, f"{source}.{key}")
)
return errors
def _embedder_python_ref_errors(value: Any, source: str | Path) -> list[str]:
if not isinstance(value, dict):
return []
config = value.get("config")
if not isinstance(config, dict):
return []
embedding_callable = config.get("embedding_callable")
if _is_python_ref(embedding_callable):
return _python_ref_errors(
embedding_callable, f"{source}.config.embedding_callable"
)
return []
def _a2a_python_ref_errors(value: Any, source: str | Path) -> list[str]:
configs = value if isinstance(value, list) else [value]
errors: list[str] = []
for index, config in enumerate(configs):
if not isinstance(config, dict):
continue
response_model = config.get("response_model")
if _is_python_ref(response_model):
errors.extend(
_python_ref_errors(response_model, f"{source}[{index}].response_model")
)
return errors
def _mcp_python_ref_errors(value: Any, source: str | Path) -> list[str]:
if not isinstance(value, list):
return []
errors: list[str] = []
for index, config in enumerate(value):
if not isinstance(config, dict):
continue
tool_filter = config.get("tool_filter")
if _is_python_ref(tool_filter):
errors.extend(
_python_ref_errors(tool_filter, f"{source}[{index}].tool_filter")
)
elif isinstance(tool_filter, dict) and tool_filter.get("type") == "static":
for key in ("allowed_tool_names", "blocked_tool_names"):
names = tool_filter.get(key)
if names is not None and not _is_string_list(names):
errors.append(
f"{source}[{index}].tool_filter.{key} must be a list of strings"
)
return errors
def _resolve_agent_python_refs(kwargs: dict[str, Any], source: str | Path) -> None:
_resolve_callable_fields(
kwargs,
source,
scalar_fields=_AGENT_CALLABLE_FIELDS,
list_fields=_AGENT_CALLABLE_LIST_FIELDS,
)
if _is_python_ref(kwargs.get("executor_class")):
kwargs["executor_class"] = _resolve_python_class(
kwargs["executor_class"], f"{source}: executor_class"
)
if "embedder" in kwargs:
kwargs["embedder"] = _resolve_embedder_python_refs(kwargs["embedder"], source)
if "a2a" in kwargs:
kwargs["a2a"] = _resolve_a2a_python_refs(kwargs["a2a"], source)
_resolve_object_reference_fields(kwargs, source, _AGENT_OBJECT_REF_FIELDS)
def _resolve_task_python_refs(kwargs: dict[str, Any], source: str | Path) -> None:
_resolve_callable_fields(
kwargs,
source,
scalar_fields=_TASK_CALLABLE_FIELDS,
list_fields=_TASK_CALLABLE_LIST_FIELDS,
)
for field in _TASK_MODEL_CLASS_FIELDS:
if _is_python_ref(kwargs.get(field)):
kwargs[field] = _resolve_model_class(kwargs[field], f"{source}: {field}")
if _is_python_ref(kwargs.get("converter_cls")):
from crewai.utilities.converter import Converter
kwargs["converter_cls"] = _resolve_python_class(
kwargs["converter_cls"],
f"{source}: converter_cls",
base_class=Converter,
)
elif isinstance(kwargs.get("converter_cls"), str):
raise JSONProjectError(
f"{source}: converter_cls must use "
f'{{"{PYTHON_REF_KEY}": "module.ConverterSubclass"}}'
)
_resolve_object_reference_fields(kwargs, source, _TASK_OBJECT_REF_FIELDS)
def _resolve_crew_python_refs(kwargs: dict[str, Any], source: str | Path) -> None:
_resolve_callable_fields(
kwargs,
source,
scalar_fields=_CREW_CALLABLE_FIELDS,
list_fields=_CREW_CALLABLE_LIST_FIELDS,
)
if "embedder" in kwargs:
kwargs["embedder"] = _resolve_embedder_python_refs(kwargs["embedder"], source)
_resolve_object_reference_fields(kwargs, source, _CREW_OBJECT_REF_FIELDS)
def _resolve_object_reference_fields(
kwargs: dict[str, Any],
source: str | Path,
fields: set[str],
) -> None:
for field in fields:
if field not in kwargs:
continue
kwargs[field] = _resolve_python_refs_recursively(
kwargs[field], f"{source}: {field}"
)
def _resolve_python_refs_recursively(value: Any, source: str | Path) -> Any:
if _is_python_ref(value):
return _resolve_python_ref(value, source, expected="any")
if isinstance(value, list):
return [
_resolve_python_refs_recursively(item, f"{source}[{index}]")
for index, item in enumerate(value)
]
if isinstance(value, dict):
return {
key: _resolve_python_refs_recursively(item, f"{source}.{key}")
for key, item in value.items()
}
return value
def _resolve_callable_fields(
kwargs: dict[str, Any],
source: str | Path,
*,
scalar_fields: set[str],
list_fields: set[str],
) -> None:
for field in scalar_fields:
if _is_python_ref(kwargs.get(field)):
kwargs[field] = _resolve_python_ref(
kwargs[field],
f"{source}: {field}",
expected="callable",
)
for field in list_fields:
value = kwargs.get(field)
if not isinstance(value, list):
continue
kwargs[field] = [
_resolve_python_ref(
item, f"{source}: {field}[{index}]", expected="callable"
)
if _is_python_ref(item)
else item
for index, item in enumerate(value)
]
def _resolve_model_class(value: Any, source: str | Path) -> type[BaseModel]:
return _resolve_python_class(value, source, base_class=BaseModel)
def _resolve_embedder_python_refs(value: Any, source: str | Path) -> Any:
if not isinstance(value, dict):
return value
config = value.get("config")
if not isinstance(config, dict):
return value
embedding_callable = config.get("embedding_callable")
if not _is_python_ref(embedding_callable):
return value
from crewai.rag.embeddings.providers.custom.embedding_callable import (
CustomEmbeddingFunction,
)
normalized = dict(value)
normalized_config = dict(config)
normalized_config["embedding_callable"] = _resolve_python_class(
embedding_callable,
f"{source}: embedder.config.embedding_callable",
base_class=CustomEmbeddingFunction,
)
normalized["config"] = normalized_config
return normalized
def _resolve_a2a_python_refs(value: Any, source: str | Path) -> Any:
if isinstance(value, list):
return [
_resolve_a2a_python_refs(item, f"{source}: a2a[{index}]")
for index, item in enumerate(value)
]
if not isinstance(value, dict):
return value
response_model = value.get("response_model")
if response_model is None:
return value
normalized = dict(value)
if _is_python_ref(response_model):
normalized["response_model"] = _resolve_model_class(
response_model,
f"{source}: a2a.response_model",
)
elif isinstance(response_model, dict):
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
normalized["response_model"] = create_model_from_schema(response_model)
return normalized
def _resolve_mcp_python_refs(value: Any) -> Any:
if not isinstance(value, list):
return value
return [
_resolve_mcp_config_python_refs(config, index)
if isinstance(config, dict)
else config
for index, config in enumerate(value)
]
def _resolve_mcp_config_python_refs(
config: dict[str, Any], index: int
) -> dict[str, Any]:
tool_filter = config.get("tool_filter")
if tool_filter is None:
return config
normalized = dict(config)
if _is_python_ref(tool_filter):
normalized["tool_filter"] = _resolve_python_ref(
tool_filter,
f"mcps[{index}].tool_filter",
expected="callable",
)
elif isinstance(tool_filter, dict) and tool_filter.get("type") == "static":
from crewai.mcp.filters import create_static_tool_filter
allowed_tool_names = tool_filter.get("allowed_tool_names")
blocked_tool_names = tool_filter.get("blocked_tool_names")
if allowed_tool_names is not None and not _is_string_list(allowed_tool_names):
raise JSONProjectValidationError(
[
f"mcps[{index}].tool_filter.allowed_tool_names must be a list of strings"
]
)
if blocked_tool_names is not None and not _is_string_list(blocked_tool_names):
raise JSONProjectValidationError(
[
f"mcps[{index}].tool_filter.blocked_tool_names must be a list of strings"
]
)
normalized["tool_filter"] = create_static_tool_filter(
allowed_tool_names=allowed_tool_names,
blocked_tool_names=blocked_tool_names,
)
return normalized
def _is_string_list(value: Any) -> bool:
return isinstance(value, list) and all(isinstance(item, str) for item in value)
def _normalize_input_files(
value: Any,
source: str | Path,
project_root: Path | None,
) -> Any:
if value is None:
return value
if not isinstance(value, dict):
raise JSONProjectValidationError(
[f"{source}: input_files must be an object mapping names to file specs"]
)
normalized: dict[str, Any] = {}
for name, file_spec in value.items():
if isinstance(file_spec, str):
normalized[name] = {
"source": _resolve_project_path(file_spec, project_root)
}
continue
if isinstance(file_spec, dict):
normalized_spec = dict(file_spec)
for field in ("source", "path"):
field_value = normalized_spec.get(field)
if isinstance(field_value, str):
normalized_spec[field] = _resolve_project_path(
field_value, project_root
)
normalized[name] = normalized_spec
continue
normalized[name] = file_spec
return normalized
def _resolve_project_path(value: str, project_root: Path | None) -> str:
if not value or _URI_RE.match(value):
return value
path = Path(value)
if path.is_absolute():
return value
return str(((project_root or Path.cwd()) / path).resolve())
def _format_validation_error(path: str | Path, exc: ValidationError) -> str:
return f"{path}: validation failed: {exc}"

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import dataclasses
from datetime import date, datetime
import json
from typing import Any, TypeAlias
@@ -23,21 +24,23 @@ def to_serializable(
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
Supports primitives, datetime objects, collections, dictionaries, and
Pydantic models. Recursion depth is limited to prevent infinite nesting.
Supports primitives, datetime objects, collections, dictionaries,
dataclasses, and Pydantic models. Recursion depth is limited to prevent
infinite nesting.
Non-convertible objects default to their string representations.
Args:
obj: Object to transform.
exclude: Set of keys to exclude from the result.
max_depth: Maximum recursion depth. Defaults to 5.
max_depth: Maximum recursion depth. Defaults to 5. Values less than or
equal to 0 disable the depth limit.
_current_depth: Current recursion depth (for internal use).
_ancestors: Set of ancestor object ids for cycle detection (for internal use).
Returns:
Serializable: A JSON-compatible structure.
"""
if _current_depth >= max_depth:
if max_depth > 0 and _current_depth >= max_depth:
return repr(obj)
if exclude is None:
@@ -58,6 +61,18 @@ def to_serializable(
return f"<circular_ref:{type(obj).__name__}>"
new_ancestors = _ancestors | {object_id}
if dataclasses.is_dataclass(obj) and not isinstance(obj, type):
return {
field.name: to_serializable(
obj=getattr(obj, field.name),
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for field in dataclasses.fields(obj)
if field.name not in exclude
}
if isinstance(obj, (list, tuple, set)):
return [
to_serializable(
@@ -84,7 +99,7 @@ def to_serializable(
if isinstance(obj, BaseModel):
try:
return to_serializable(
obj=obj.model_dump(exclude=exclude),
obj=obj.model_dump(mode="json", exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,

View File

@@ -12,6 +12,36 @@ from crewai.project.json_loader import JSONProjectError, JSONProjectValidationEr
from crewai.project.crew_loader import load_crew
def _write_python_defs(tmp_path: Path) -> None:
module = tmp_path / "json_refs.py"
module.write_text(
"from pydantic import BaseModel\n"
"from crewai import Agent, Task\n"
"from crewai.security.security_config import SecurityConfig\n"
"from crewai.utilities.converter import Converter\n"
"\n"
"def always_true(_context):\n"
" return True\n"
"\n"
"def task_callback(output):\n"
" return output\n"
"\n"
"class SpecialAgent(Agent):\n"
" specialty: str = 'general'\n"
"\n"
"class SpecialTask(Task):\n"
" priority: int = 0\n"
"\n"
"class ReportModel(BaseModel):\n"
" summary: str\n"
"\n"
"class SpecialConverter(Converter):\n"
" pass\n"
"\n"
"security_config = SecurityConfig(fingerprint='agent-seed')\n"
)
def _write_agent(agents_dir: Path, name: str, **overrides) -> Path:
defn = {
"role": f"{name} role",
@@ -30,6 +60,15 @@ def _write_crew(project_dir: Path, crew_def: dict) -> Path:
return f
def _input_file_path(value) -> Path:
if isinstance(value, dict):
source = value.get("source", value)
else:
source = getattr(value, "source", value)
path = getattr(source, "path", source)
return Path(str(path))
class TestLoadCrew:
def test_minimal_crew(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
@@ -139,6 +178,38 @@ class TestLoadCrew:
from crewai import Process
assert crew.process == Process.hierarchical
def test_crew_hierarchical_manager_agent_from_separate_agent_file(
self, tmp_path: Path
):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "worker")
_write_agent(agents_dir, "manager")
crew_def = {
"name": "hier_manager_crew",
"agents": ["worker"],
"tasks": [
{
"name": "work",
"description": "Do work",
"expected_output": "Work done",
"agent": "manager",
}
],
"process": "hierarchical",
"manager_agent": "manager",
}
crew_file = _write_crew(tmp_path, crew_def)
crew, _ = load_crew(crew_file)
assert len(crew.agents) == 1
assert crew.agents[0].role == "worker role"
assert crew.manager_agent is not None
assert crew.manager_agent.role == "manager role"
assert crew.tasks[0].agent is crew.manager_agent
def test_crew_accepts_llm_config_objects(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
@@ -289,6 +360,156 @@ class TestLoadCrew:
assert task.guardrail == "Return a summary field."
assert task.allow_crewai_trigger_context is False
def test_crew_loads_conditional_task_with_python_condition(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
):
_write_python_defs(tmp_path)
monkeypatch.syspath_prepend(str(tmp_path))
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "worker")
crew_def = {
"name": "conditional_crew",
"agents": ["worker"],
"tasks": [
{
"name": "first",
"description": "First task",
"expected_output": "First output",
"agent": "worker",
},
{
"type": "ConditionalTask",
"name": "second",
"description": "Second task",
"expected_output": "Second output",
"agent": "worker",
"condition": {"python": "json_refs.always_true"},
},
],
}
crew_file = _write_crew(tmp_path, crew_def)
crew, _ = load_crew(crew_file)
from crewai.tasks.conditional_task import ConditionalTask
assert isinstance(crew.tasks[1], ConditionalTask)
assert crew.tasks[1].should_execute(None)
def test_crew_loads_custom_agent_and_task_types(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
):
_write_python_defs(tmp_path)
monkeypatch.syspath_prepend(str(tmp_path))
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(
agents_dir,
"specialist",
type={"python": "json_refs.SpecialAgent"},
security_config={"python": "json_refs.security_config"},
specialty="research",
)
crew_def = {
"name": "custom_types_crew",
"agents": ["specialist"],
"tasks": [
{
"type": {"python": "json_refs.SpecialTask"},
"name": "prioritized",
"description": "Do prioritized work",
"expected_output": "Prioritized output",
"agent": "specialist",
"priority": 7,
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
crew, _ = load_crew(crew_file)
assert crew.agents[0].__class__.__name__ == "SpecialAgent"
assert crew.agents[0].specialty == "research"
from crewai.security.fingerprint import Fingerprint
assert crew.agents[0].security_config.fingerprint == Fingerprint.generate(
seed="agent-seed"
)
assert crew.tasks[0].__class__.__name__ == "SpecialTask"
assert crew.tasks[0].priority == 7
def test_crew_loads_python_ref_task_fields(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
):
_write_python_defs(tmp_path)
monkeypatch.syspath_prepend(str(tmp_path))
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "writer")
crew_def = {
"name": "python_refs_crew",
"agents": ["writer"],
"tasks": [
{
"name": "write",
"description": "Write something",
"expected_output": "Written content",
"agent": "writer",
"callback": {"python": "json_refs.task_callback"},
"output_json": {"python": "json_refs.ReportModel"},
"converter_cls": {"python": "json_refs.SpecialConverter"},
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
crew, _ = load_crew(crew_file)
task = crew.tasks[0]
assert task.callback.__name__ == "task_callback"
assert task.output_json.__name__ == "ReportModel"
assert "summary" in task.output_json.model_fields
assert task.converter_cls.__name__ == "SpecialConverter"
def test_crew_loads_project_relative_input_files(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()
_write_agent(agents_dir, "reader")
data_dir = tmp_path / "data"
data_dir.mkdir()
brief_path = data_dir / "brief.txt"
spec_path = data_dir / "spec.md"
brief_path.write_text("brief")
spec_path.write_text("spec")
crew_def = {
"name": "input_files_crew",
"agents": ["reader"],
"tasks": [
{
"name": "read",
"description": "Read files",
"expected_output": "File summary",
"agent": "reader",
"input_files": {
"brief": "data/brief.txt",
"spec": {"source": "data/spec.md"},
},
}
],
}
crew_file = _write_crew(tmp_path, crew_def)
crew, _ = load_crew(crew_file)
input_files = crew.tasks[0].input_files
assert _input_file_path(input_files["brief"]) == brief_path
assert _input_file_path(input_files["spec"]) == spec_path
def test_missing_agent_file_raises(self, tmp_path: Path):
agents_dir = tmp_path / "agents"
agents_dir.mkdir()

View File

@@ -248,6 +248,33 @@ class TestLoadAgent:
assert len(agent.tools or []) == 1
assert agent.tools[0].name == "echo"
def test_load_agent_accepts_static_mcp_tool_filter(self, tmp_path: Path):
agent_def = {
"role": "MCP User",
"goal": "Use MCP tools",
"backstory": "MCP expert.",
"mcps": [
{
"command": "python",
"args": ["server.py"],
"tool_filter": {
"type": "static",
"allowed_tool_names": ["read_file"],
"blocked_tool_names": ["delete_file"],
},
}
],
}
agent_file = tmp_path / "agent.json"
agent_file.write_text(json.dumps(agent_def))
agent = load_agent(agent_file)
tool_filter = agent.mcps[0].tool_filter
assert tool_filter({"name": "read_file"})
assert not tool_filter({"name": "delete_file"})
assert not tool_filter({"name": "write_file"})
def test_load_agent_rejects_runtime_fields(self, tmp_path: Path):
agent_def = {
"id": "00000000-0000-4000-8000-000000000000",
@@ -399,6 +426,33 @@ class TestValidationDoesNotExecuteTools:
assert not sentinel.exists(), "validation must not execute tools/<name>.py"
assert project.agent_names == ["worker"]
def test_validate_does_not_import_python_refs(
self, tmp_path, monkeypatch: pytest.MonkeyPatch
):
from crewai.project.json_loader import validate_crew_project
sentinel = tmp_path / "python_ref_executed.txt"
(tmp_path / "callbacks.py").write_text(
"from pathlib import Path\n"
f"Path({str(sentinel)!r}).write_text('boom')\n"
"def step_callback(*_args, **_kwargs):\n"
" return None\n"
)
monkeypatch.syspath_prepend(str(tmp_path))
sys.modules.pop("callbacks", None)
crew_path = self._write_project(
tmp_path,
tool_line='{"tool_type": "some.module.Tool"}',
)
agent_file = tmp_path / "agents" / "worker.jsonc"
agent_def = json.loads(agent_file.read_text())
agent_def["step_callback"] = {"python": "callbacks.step_callback"}
agent_file.write_text(json.dumps(agent_def))
validate_crew_project(crew_path, tmp_path / "agents")
assert not sentinel.exists(), "validation must not import Python refs"
def test_validate_reports_missing_custom_tool_file(self, tmp_path):
from crewai.project.json_loader import (
JSONProjectValidationError,

View File

@@ -44,6 +44,8 @@ def test_flow_public_exports_are_explicit():
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
"FlowExpressionActionDefinition",
"FlowHumanFeedbackDefinition",
"FlowMethodDefinition",
@@ -432,6 +434,73 @@ def test_flow_definition_round_trips_json_and_yaml():
assert yaml_round_trip.methods["decide"].listen == "begin"
def test_each_action_round_trips_json_and_yaml():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
"methods": {
"process_rows": {
"description": "Process every loaded row.",
"start": True,
"do": {
"call": "each",
"in": "state.rows",
"do": [
{
"normalize": {
"call": "tool",
"ref": "my_tools:NormalizeRowTool",
"with": {"row": "${ item }"},
}
},
{
"save": {
"call": "code",
"ref": "my_flow:save_row",
"with": {
"row": "${ item }",
"normalized": "${ outputs.normalize }",
},
}
},
],
},
}
},
}
)
json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json())
yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml())
assert json_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.methods["process_rows"].description == (
"Process every loaded row."
)
assert yaml_round_trip.methods["process_rows"].do.call == "each"
def test_flow_definition_rejects_invalid_method_names():
with pytest.raises(ValueError, match="Flow method names must match"):
flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "InvalidMethodNameFlow",
"methods": {
"process-rows": {
"start": True,
"do": {
"call": "expression",
"expr": "'done'",
},
}
},
}
)
def test_flow_definition_detects_persist_metadata():
@persist(verbose=True)
class PersistedFlow(Flow[dict]):

View File

@@ -1,12 +1,15 @@
from __future__ import annotations
import asyncio
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
import threading
from typing import Any, ClassVar
from unittest.mock import patch
import pytest
from pydantic import ValidationError
from pydantic import BaseModel, ValidationError
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
@@ -44,6 +47,26 @@ class TypedInputsTool(BaseTool):
return f"{count}:{','.join(include_domains)}"
class AsyncResultTool(BaseTool):
name: str = "AsyncResultTool"
description: str = "Returns an async result from its sync entrypoint."
def _run(self, value: str) -> Any:
async def build_result() -> str:
await asyncio.sleep(0)
return f"async:{value}"
return build_result()
class CallableCodeAction:
def __call__(self, value: str) -> str:
return f"callable:{value}"
CALLABLE_CODE_ACTION = CallableCodeAction()
class ChainFlow(Flow):
@start()
def begin(self):
@@ -67,6 +90,41 @@ class ToolInputFlow(Flow):
return {"query": "ai agents", "suffix": " news"}
class EachActionFlow(Flow):
inner_thread_id: int | None = None
def normalize_row(self, row: str, prefix: str = "normalized") -> str:
return f"{prefix}:{row}"
def save_row(self, row: str, normalized: str) -> dict[str, str]:
return {"row": row, "normalized": normalized}
def keyword_code(self, name: str, punctuation: str) -> str:
return f"{name}{punctuation}"
def fail_on_bad_row(self, row: str) -> str:
if row == "bad":
raise RuntimeError("bad row")
return row
def require_threaded_context(self, row: str) -> str:
try:
asyncio.get_running_loop()
except RuntimeError:
pass
else:
raise RuntimeError("inner action ran on the event loop")
from crewai.flow.flow_context import current_flow_method_name
self.inner_thread_id = threading.get_ident()
return f"{current_flow_method_name.get()}:{row}"
def after_each(self) -> str:
self.state["after_count"] = self.state.get("after_count", 0) + 1
return f"after:{self.state['after_count']}"
CHAIN_YAML = f"""
schema: crewai.flow/v1
name: ChainFlow
@@ -727,6 +785,381 @@ methods:
flow.kickoff()
def test_code_action_renders_keyword_inputs():
yaml_str = f"""
schema: crewai.flow/v1
name: CodeWithFlow
methods:
greet:
do:
call: code
ref: {__name__}:EachActionFlow.keyword_code
with:
name: "${{state.name}}"
punctuation: "!"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"name": "hello"}) == "hello!"
def test_code_action_supports_callable_instance_refs():
yaml_str = f"""
schema: crewai.flow/v1
name: CallableInstanceFlow
methods:
call_instance:
do:
call: code
ref: {__name__}:CALLABLE_CODE_ACTION
with:
value: "${{state.value}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"value": "ok"}) == "callable:ok"
def test_each_action_executes_one_nested_code_action():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- normalize:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
row: "${{item}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
"normalized:a",
"normalized:b",
]
def test_each_action_runs_sync_inner_actions_off_event_loop_with_context():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- threaded:
call: code
ref: {__name__}:EachActionFlow.require_threaded_context
with:
row: "${{item}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
caller_thread_id = threading.get_ident()
assert flow.kickoff(inputs={"rows": ["a"]}) == ["process_rows:a"]
assert flow.inner_thread_id is not None
assert flow.inner_thread_id != caller_thread_id
def test_each_action_runs_async_tool_results_from_sync_inner_actions():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- async_tool:
call: tool
ref: {__name__}:AsyncResultTool
with:
value: "${{item}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["async:a", "async:b"]
def test_each_action_uses_iteration_outputs_between_nested_actions():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- normalize:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
row: "${{item}}"
prefix: saved
- save:
call: code
ref: {__name__}:EachActionFlow.save_row
with:
row: "${{item}}"
normalized: "${{outputs.normalize}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
{"row": "a", "normalized": "saved:a"},
{"row": "b", "normalized": "saved:b"},
]
def test_each_action_resets_inner_outputs_between_iterations():
yaml_str = """
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- leak_check:
call: expression
expr: "has(outputs.previous) ? outputs.previous : 'empty'"
- previous:
call: expression
expr: item
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == ["a", "b"]
assert flow._method_outputs == [
{"method": "process_rows", "output": ["a", "b"]}
]
def test_each_action_preserves_flow_outputs_and_prefers_inner_outputs():
yaml_str = """
schema: crewai.flow/v1
name: EachFlow
methods:
seed:
do:
call: expression
expr: "'global'"
start: true
process_rows:
do:
call: each
in: state.rows
do:
- before_shadow:
call: expression
expr: "outputs.seed + ':' + item"
- seed:
call: expression
expr: "'local:' + item"
- after_shadow:
call: expression
expr: "outputs.seed"
listen: seed
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff(inputs={"rows": ["a", "b"]}) == [
"local:a",
"local:b",
]
assert flow._method_outputs == [
{"method": "seed", "output": "global"},
{"method": "process_rows", "output": ["local:a", "local:b"]},
]
def test_each_action_empty_list_returns_empty_and_listener_runs_once():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- normalize:
call: code
ref: {__name__}:EachActionFlow.normalize_row
with:
row: "${{item}}"
start: true
after_each:
do:
call: code
ref: {__name__}:EachActionFlow.after_each
listen: process_rows
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
events = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_finished(source, event):
events.append(event.method_name)
result = flow.kickoff(inputs={"rows": []})
assert result == "after:1"
assert flow.method_outputs == [[], "after:1"]
assert flow.state["after_count"] == 1
assert events.count("process_rows") == 1
assert events.count("after_each") == 1
@pytest.mark.parametrize(
("expr", "inputs"),
[
("1", {}),
('"rows"', {}),
("state.rows", {"rows": {"a": 1}}),
],
)
def test_each_action_rejects_non_list_inputs(expr, inputs):
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
"methods": {
"process_rows": {
"start": True,
"do": {
"call": "each",
"in": expr,
"do": [{"value": {"call": "expression", "expr": "item"}}],
},
}
},
}
)
flow = Flow.from_definition(definition)
with pytest.raises(ValueError, match="each.in must evaluate to an array"):
flow.kickoff(inputs=inputs)
@pytest.mark.parametrize(
"action_do",
[
[],
[{"first": {"call": "expression", "expr": "item"}, "second": {"call": "expression", "expr": "item"}}],
[{"1bad": {"call": "expression", "expr": "item"}}],
[
{"same": {"call": "expression", "expr": "item"}},
{"same": {"call": "expression", "expr": "item"}},
],
],
)
def test_each_action_validates_inner_action_shape(action_do):
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
"methods": {
"process_rows": {
"start": True,
"do": {
"call": "each",
"in": "state.rows",
"do": action_do,
},
}
},
}
)
def test_each_action_rejects_nested_each_actions():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "EachFlow",
"methods": {
"process_rows": {
"start": True,
"do": {
"call": "each",
"in": "state.rows",
"do": [
{
"nested": {
"call": "each",
"in": "state.children",
"do": [
{
"child": {
"call": "expression",
"expr": "item",
}
}
],
}
}
],
},
}
},
}
)
def test_each_action_failure_fails_outer_method():
yaml_str = f"""
schema: crewai.flow/v1
name: EachFlow
methods:
process_rows:
do:
call: each
in: state.rows
do:
- validate:
call: code
ref: {__name__}:EachActionFlow.fail_on_bad_row
with:
row: "${{item}}"
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
with pytest.raises(RuntimeError, match="bad row"):
flow.kickoff(inputs={"rows": ["ok", "bad"]})
def test_expression_action_round_trips():
definition = FlowDefinition.from_dict(
{
@@ -751,6 +1184,26 @@ def test_expression_action_round_trips():
assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified"
def test_expression_local_context_recurses_into_dataclass_values():
from crewai.flow.runtime._expressions import evaluate_expression
class Payload(BaseModel):
name: str
@dataclass
class Row:
payload: Payload
assert (
evaluate_expression(
Flow(),
"item.payload.name",
local_context={"item": Row(payload=Payload(name="qualified"))},
)
== "qualified"
)
def test_expression_action_can_route_like_if_else():
yaml_str = f"""
schema: crewai.flow/v1
@@ -830,26 +1283,6 @@ def test_tool_action_requires_module_qualname_ref():
Flow.from_definition(definition)
def test_code_action_rejects_tool_inputs():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "InvalidCodeActionFlow",
"methods": {
"begin": {
"start": True,
"do": {
"call": "code",
"ref": f"{__name__}:ChainFlow.begin",
"with": {"search_query": "ai agents"},
},
}
},
}
)
def test_pydantic_state_from_ref_parity():
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
assert result == "count=1"

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass
from datetime import date, datetime
from typing import List
from typing import Any, List
import pytest
from crewai.utilities.serialization import to_serializable, to_string
@@ -20,6 +21,13 @@ class Person(BaseModel):
skills: List[str]
@dataclass
class DataclassPerson:
name: str
address: Address
skills: tuple[str, ...]
@pytest.mark.parametrize(
"test_input,expected",
[
@@ -106,6 +114,24 @@ def test_pydantic_model_serialization():
)
def test_dataclass_serialization_recurses_into_nested_values():
person = DataclassPerson(
name="Ada",
address=Address(street="1 Loop", city="Compute", country="Pythonia"),
skills=("Python", "Math"),
)
assert to_serializable(person) == {
"name": "Ada",
"address": {
"street": "1 Loop",
"city": "Compute",
"country": "Pythonia",
},
"skills": ["Python", "Math"],
}
def test_depth_limit():
"""Test max depth handling with a deeply nested structure"""
@@ -130,6 +156,27 @@ def test_depth_limit():
}
@pytest.mark.parametrize("max_depth", [0, -1])
def test_non_positive_max_depth_disables_depth_limit(max_depth):
def create_nested(depth):
if depth == 0:
return "value"
return {"next": create_nested(depth - 1)}
assert to_serializable(create_nested(10), max_depth=max_depth) == create_nested(10)
def test_unlimited_depth_still_detects_dataclass_cycles():
@dataclass
class Node:
child: Any = None
node = Node()
node.child = node
assert to_serializable(node, max_depth=0) == {"child": "<circular_ref:Node>"}
def test_exclude_keys():
result = to_serializable({"key1": "value1", "key2": "value2"}, exclude={"key1"})
assert result == {"key2": "value2"}

18
uv.lock generated
View File

@@ -978,7 +978,7 @@ wheels = [
[[package]]
name = "chromadb"
version = "1.5.9"
version = "1.1.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "bcrypt" },
@@ -997,9 +997,9 @@ dependencies = [
{ name = "opentelemetry-sdk" },
{ name = "orjson" },
{ name = "overrides" },
{ name = "posthog" },
{ name = "pybase64" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "pypika" },
{ name = "pyyaml" },
{ name = "rich" },
@@ -1010,13 +1010,13 @@ dependencies = [
{ name = "typing-extensions" },
{ name = "uvicorn", extra = ["standard"] },
]
sdist = { url = "https://files.pythonhosted.org/packages/92/d1/5e33b26985f0c7046a0be1cee2158ada1748ee700d2545057fde1468d74d/chromadb-1.5.9.tar.gz", hash = "sha256:5c20e62a455c28bacac927f26116a73fd8e1799e0d908be8e8a4f02197a54731", size = 2595635, upload-time = "2026-05-05T05:54:51.713Z" }
sdist = { url = "https://files.pythonhosted.org/packages/7f/48/11851dddeadad6abe36ee071fedc99b5bdd2c324df3afa8cb952ae02798b/chromadb-1.1.1.tar.gz", hash = "sha256:ebfce0122753e306a76f1e291d4ddaebe5f01b5979b97ae0bc80b1d4024ff223", size = 1338109, upload-time = "2025-10-05T02:49:14.834Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/dd/5b/3cced915244f43ed14b53fe9f63a37f05f865064f4e4fe7d9448d3f2a352/chromadb-1.5.9-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:60701011b5e6409647fa40d12c7c5a66b2b0bfcf33a52db2ad53a30a2abc4957", size = 22564540, upload-time = "2026-05-05T05:54:48.906Z" },
{ url = "https://files.pythonhosted.org/packages/34/4c/adcef1f4e82a2ef69ccd3711d55fc289193d54c4c0ff7a0292a3631db46f/chromadb-1.5.9-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:814b9c95617377f6501e5757d63dfddb554a283a7739c87b9fa573850174e6f3", size = 21699698, upload-time = "2026-05-05T05:54:45.078Z" },
{ url = "https://files.pythonhosted.org/packages/38/4e/937bc4d2e6f8ab9664ec79931fbbd69efff47e513ec2924b071e4b0ff774/chromadb-1.5.9-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9192d111bd662241625867962333d99369a00769a50f8b2f58cb388731274d7e", size = 22680924, upload-time = "2026-05-05T05:54:36.25Z" },
{ url = "https://files.pythonhosted.org/packages/e6/ec/0c42039e80b9acc534f67b73b7a42471948042859b3a64867b50a4a77fa3/chromadb-1.5.9-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cc09b3df76e5a5cb386aed2715a2eea152e3949f9e1ba93c7119505377749929", size = 23316203, upload-time = "2026-05-05T05:54:41.157Z" },
{ url = "https://files.pythonhosted.org/packages/eb/ce/0f7be6e5d0feafa2cda54b12e6542afeea7dea89d2d411e14da90f8abb96/chromadb-1.5.9-cp39-abi3-win_amd64.whl", hash = "sha256:4fd0b560e56761b7f3cb4d5c6205fd5f20814484b4a3e4e9af9038c2b428fc6c", size = 23542454, upload-time = "2026-05-05T05:54:54.942Z" },
{ url = "https://files.pythonhosted.org/packages/39/59/0d881a9b7eb63d8d2446cf67fcbb53fb8ae34991759d2b6024a067e90a9a/chromadb-1.1.1-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:27fe0e25ef0f83fb09c30355ab084fe6f246808a7ea29e8c19e85cf45785b90d", size = 19175479, upload-time = "2025-10-05T02:49:12.525Z" },
{ url = "https://files.pythonhosted.org/packages/94/4f/5a9fa317c84c98e70af48f74b00aa25589626c03a0428b4381b2095f3d73/chromadb-1.1.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:95aed58869683f12e7dcbf68b039fe5f576dbe9d1b86b8f4d014c9d077ccafd2", size = 18267188, upload-time = "2025-10-05T02:49:09.236Z" },
{ url = "https://files.pythonhosted.org/packages/45/1a/02defe2f1c8d1daedb084bbe85f5b6083510a3ba192ed57797a3649a4310/chromadb-1.1.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:06776dad41389a00e7d63d936c3a15c179d502becaf99f75745ee11b062c9b6a", size = 18855754, upload-time = "2025-10-05T02:49:03.299Z" },
{ url = "https://files.pythonhosted.org/packages/5a/0d/80be82717e5dc19839af24558494811b6f2af2b261a8f21c51b872193b09/chromadb-1.1.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bba0096a7f5e975875ead23a91c0d41d977fbd3767f60d3305a011b0ace7afd3", size = 19893681, upload-time = "2025-10-05T02:49:06.481Z" },
{ url = "https://files.pythonhosted.org/packages/2d/6e/956e62975305a4e31daf6114a73b3b0683a8f36f8d70b20aabd466770edb/chromadb-1.1.1-cp39-abi3-win_amd64.whl", hash = "sha256:a77aa026a73a18181fd89bbbdb86191c9a82fd42aa0b549ff18d8cae56394c8b", size = 19844042, upload-time = "2025-10-05T02:49:16.925Z" },
]
[[package]]
@@ -1421,7 +1421,7 @@ requires-dist = [
{ name = "boto3", marker = "extra == 'aws'", specifier = "~=1.42.79" },
{ name = "boto3", marker = "extra == 'bedrock'", specifier = "~=1.42.79" },
{ name = "cel-python", specifier = ">=0.5.0,<0.6" },
{ name = "chromadb", specifier = "~=1.5.9" },
{ name = "chromadb", specifier = "~=1.1.0" },
{ name = "click", specifier = ">=8.1.7,<9" },
{ name = "crewai-cli", editable = "lib/cli" },
{ name = "crewai-core", editable = "lib/crewai-core" },