Implement Flow definition run tools without Python code (#6144)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

A `do:` step can now say `call: tool` and name a CrewAI tool to run,
passing its inputs under `with:`. Before this, a definition could only
point at Python code to run.

```yaml
methods:
  search:
    start: true
    do:
      call: tool
      ref: crewai_tools:ExaSearchTool
      with:
        search_query: ai agents
```
This commit is contained in:
Vini Brasil
2026-06-12 19:47:58 -07:00
committed by GitHub
parent bf291a7a55
commit 2444895ca4
5 changed files with 155 additions and 5 deletions

View File

@@ -9,6 +9,7 @@ from typing_extensions import TypeIs
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowCodeActionDefinition,
FlowConfigDefinition,
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
@@ -83,7 +84,7 @@ def _stamp_inherited_conversational_metadata(
def _method_action(method: Any) -> FlowActionDefinition:
return FlowActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
return FlowCodeActionDefinition(ref=f"{method.__module__}:{method.__qualname__}")
def _set_flow_method_definition(

View File

@@ -28,6 +28,7 @@ FlowDefinitionCondition = str | dict[str, Any]
__all__ = [
"FlowActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -38,6 +39,7 @@ __all__ = [
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
]
@@ -142,13 +144,28 @@ class FlowHumanFeedbackDefinition(BaseModel):
return _object_ref(value)
class FlowActionDefinition(BaseModel):
"""What a Flow method node executes, independent of when it fires."""
class FlowCodeActionDefinition(BaseModel):
"""A Flow method action that executes importable Python code."""
model_config = ConfigDict(extra="forbid")
call: TypingLiteral["code"] = "code"
ref: str
class FlowToolActionDefinition(BaseModel):
"""A Flow method action that invokes a CrewAI tool."""
model_config = ConfigDict(populate_by_name=True, extra="forbid")
call: TypingLiteral["tool"]
ref: str
with_: dict[str, Any] | None = Field(default=None, alias="with")
FlowActionDefinition = FlowCodeActionDefinition | FlowToolActionDefinition
class FlowMethodDefinition(BaseModel):
"""Static definition of one Flow method and its execution roles."""

View File

@@ -13,7 +13,11 @@ import inspect
from operator import attrgetter
from typing import TYPE_CHECKING, Any, cast
from crewai.flow.flow_definition import FlowActionDefinition
from crewai.flow.flow_definition import (
FlowActionDefinition,
FlowCodeActionDefinition,
FlowToolActionDefinition,
)
if TYPE_CHECKING:
@@ -51,7 +55,7 @@ def resolve_instance_ref(ref: str, *, field: str) -> Any:
def _resolve_code_action(
flow: Flow[Any], action: FlowActionDefinition
flow: Flow[Any], action: FlowCodeActionDefinition
) -> Callable[..., Any]:
ref = action.ref
target = resolve_ref(ref, field="do")
@@ -63,8 +67,37 @@ def _resolve_code_action(
return handler
def _resolve_tool_action(
_flow: Flow[Any], action: FlowToolActionDefinition
) -> Callable[..., Any]:
target = resolve_ref(action.ref, field="do")
from crewai.tools import BaseTool
if not (inspect.isclass(target) and issubclass(target, BaseTool)):
raise InvalidRefError(
f"invalid tool ref {action.ref!r}; expected a BaseTool class"
)
try:
tool_cls = cast(Callable[[], BaseTool], target)
tool = tool_cls()
except Exception as e:
raise InvalidRefError(
f"cannot instantiate tool ref {action.ref!r} without arguments: {e}"
) from e
tool_kwargs = action.with_ or {}
def run_tool(*_args: Any, **_kwargs: Any) -> Any:
return tool.run(**tool_kwargs)
return run_tool
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
"""Turn one `do:` action into the callable the flow runs for that node."""
if action.call == "code":
return _resolve_code_action(flow, action)
if action.call == "tool":
return _resolve_tool_action(flow, action)
raise ValueError(f"unknown call type {action.call!r}")

View File

@@ -37,6 +37,7 @@ def test_flow_public_exports_are_explicit():
}
assert set(flow_definition.__all__) == {
"FlowActionDefinition",
"FlowCodeActionDefinition",
"FlowConfigDefinition",
"FlowConversationalDefinition",
"FlowConversationalRouterDefinition",
@@ -47,6 +48,7 @@ def test_flow_public_exports_are_explicit():
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowStateDefinition",
"FlowToolActionDefinition",
}
assert "build_flow_structure" in flow_visualization.__all__
assert "calculate_node_levels" not in flow_visualization.__all__

View File

@@ -24,9 +24,18 @@ from crewai.flow.flow_definition import FlowConfigDefinition, FlowDefinition
from crewai.flow.persistence import persist
from crewai.flow.persistence.base import FlowPersistence
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.tools import BaseTool
from crewai.types.streaming import FlowStreamingOutput
class StaticSearchTool(BaseTool):
name: str = "StaticSearchTool"
description: str = "Returns a deterministic search result."
def _run(self, search_query: str, prefix: str = "search") -> str:
return f"{prefix}:{search_query}"
class ChainFlow(Flow):
@start()
def begin(self):
@@ -490,6 +499,94 @@ def test_flow_definition_stamps_refs():
assert definition.methods["shout"].do.ref == f"{__name__}:ChainFlow.shout"
def test_from_definition_runs_tool_action_with_static_inputs():
yaml_str = f"""
schema: crewai.flow/v1
name: ToolFlow
methods:
search:
do:
call: tool
ref: {__name__}:StaticSearchTool
with:
search_query: ai agents
prefix: found
start: true
"""
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
assert flow.kickoff() == "found:ai agents"
def test_tool_action_round_trips_with_inputs():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {"search_query": "ai agents"},
},
}
},
}
)
assert definition.to_dict()["methods"]["search"]["do"] == {
"call": "tool",
"ref": f"{__name__}:StaticSearchTool",
"with": {"search_query": "ai agents"},
}
assert Flow.from_definition(definition).kickoff() == "search:ai agents"
def test_tool_action_requires_module_qualname_ref():
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "ToolFlow",
"methods": {
"search": {
"start": True,
"do": {
"call": "tool",
"ref": f"{__name__}.StaticSearchTool",
"with": {"search_query": "ai agents"},
},
}
},
}
)
with pytest.raises(ValueError, match="expected 'module:qualname'"):
Flow.from_definition(definition)
def test_code_action_rejects_tool_inputs():
with pytest.raises(ValidationError):
FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "InvalidCodeActionFlow",
"methods": {
"begin": {
"start": True,
"do": {
"call": "code",
"ref": f"{__name__}:ChainFlow.begin",
"with": {"search_query": "ai agents"},
},
}
},
}
)
def test_pydantic_state_from_ref_parity():
flow, result = assert_parity(PydanticStateFlow, PYDANTIC_STATE_YAML)
assert result == "count=1"