feat: add support for agents to invoke Flows as tools

- Add FlowTool(BaseTool) in crewai/tools/flow_tool.py that wraps a Flow
  class as a callable tool (instantiates, calls kickoff, returns result)
- Add create_flow_tools() helper to convert list of Flow classes to tools
- Add Agent.flows parameter (list of Flow classes)
- Wire _set_flow_tools() into Agent.post_init_setup to auto-convert
  flows into tools and merge them into the agent's tool list
- Export FlowTool and create_flow_tools from crewai/__init__.py
- Add comprehensive tests (14 tests covering unit + integration)
This commit is contained in:
Joao Moura
2026-04-26 08:43:18 -07:00
parent 62d097a761
commit ef8b2ba03b
4 changed files with 293 additions and 0 deletions

View File

@@ -94,10 +94,16 @@ try:
} }
from crewai.tools.base_tool import BaseTool as _BaseTool from crewai.tools.base_tool import BaseTool as _BaseTool
from crewai.tools.flow_tool import (
FlowTool as _FlowTool,
create_flow_tools as _create_flow_tools,
)
from crewai.tools.structured_tool import CrewStructuredTool as _CrewStructuredTool from crewai.tools.structured_tool import CrewStructuredTool as _CrewStructuredTool
_base_namespace["BaseTool"] = _BaseTool _base_namespace["BaseTool"] = _BaseTool
_base_namespace["CrewStructuredTool"] = _CrewStructuredTool _base_namespace["CrewStructuredTool"] = _CrewStructuredTool
_base_namespace["FlowTool"] = _FlowTool
_base_namespace["create_flow_tools"] = _create_flow_tools
try: try:
from crewai.a2a.config import ( from crewai.a2a.config import (

View File

@@ -85,6 +85,7 @@ from crewai.skills.loader import activate_skill, discover_skills
from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel
from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint
from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.flow_tool import create_flow_tools
from crewai.types.callback import SerializableCallable from crewai.types.callback import SerializableCallable
from crewai.utilities.agent_utils import ( from crewai.utilities.agent_utils import (
get_tool_names, get_tool_names,
@@ -305,6 +306,10 @@ class Agent(BaseAgent):
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig. Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
""", """,
) )
flows: list[Any] | None = Field(
default=None,
description="Flow classes that the agent can invoke as tools. Each entry is a Flow subclass (not an instance).",
)
agent_executor: CrewAgentExecutor | AgentExecutor | None = Field( agent_executor: CrewAgentExecutor | AgentExecutor | None = Field(
default=None, description="An instance of the CrewAgentExecutor class." default=None, description="An instance of the CrewAgentExecutor class."
) )
@@ -347,6 +352,7 @@ class Agent(BaseAgent):
) )
self.set_skills() self.set_skills()
self._set_flow_tools()
if self.reasoning and self.planning_config is None: if self.reasoning and self.planning_config is None:
warnings.warn( warnings.warn(
@@ -459,6 +465,16 @@ class Agent(BaseAgent):
self.skills = resolved if resolved else None self.skills = resolved if resolved else None
def _set_flow_tools(self) -> None:
"""Convert Flow classes in ``self.flows`` to tools and merge them."""
if not self.flows:
return
flow_tools = create_flow_tools(self.flows)
if flow_tools:
if self.tools is None:
self.tools = []
self.tools.extend(flow_tools)
def _is_any_available_memory(self) -> bool: def _is_any_available_memory(self) -> bool:
"""Check if unified memory is available (agent or crew).""" """Check if unified memory is available (agent or crew)."""
if getattr(self, "memory", None): if getattr(self, "memory", None):

View File

@@ -0,0 +1,82 @@
"""Wrap Flow classes as callable tools so agents can invoke them."""
from __future__ import annotations
import json
from typing import Any
from pydantic import BaseModel, Field
from crewai.tools.base_tool import BaseTool
from crewai.utilities.string_utils import sanitize_tool_name
class FlowToolInputSchema(BaseModel):
"""Default input schema for a FlowTool."""
inputs: str = Field(
default="{}",
description=(
"JSON string of key-value pairs to pass as inputs to the flow. "
"Use '{}' if the flow requires no inputs."
),
)
class FlowTool(BaseTool):
"""Wraps a Flow class as a BaseTool so an agent can invoke it.
The tool instantiates the Flow, calls ``kickoff(inputs=...)`` and returns
the result as a string.
"""
name: str = ""
description: str = ""
flow_class: Any = Field(
default=None,
description="The Flow class (not instance) to wrap.",
exclude=True,
)
args_schema: Any = FlowToolInputSchema
def _run(self, inputs: str = "{}") -> str:
"""Instantiate the Flow, run kickoff, and return the result."""
try:
parsed_inputs = json.loads(inputs) if isinstance(inputs, str) else inputs
except (json.JSONDecodeError, TypeError):
parsed_inputs = {}
if not isinstance(parsed_inputs, dict):
parsed_inputs = {}
flow_instance = self.flow_class()
result = flow_instance.kickoff(inputs=parsed_inputs if parsed_inputs else None)
return str(result)
def create_flow_tools(flows: list[type] | None) -> list[BaseTool]:
"""Convert a list of Flow classes into BaseTool wrappers.
Args:
flows: Flow classes (not instances) to wrap as tools.
Returns:
A list of FlowTool instances ready for agent use.
"""
if not flows:
return []
tools: list[BaseTool] = []
for flow_cls in flows:
name = sanitize_tool_name(flow_cls.__name__)
docstring = (flow_cls.__doc__ or "").strip()
description = docstring if docstring else f"Run the {flow_cls.__name__} flow."
tools.append(
FlowTool(
name=name,
description=description,
flow_class=flow_cls,
)
)
return tools

View File

@@ -0,0 +1,189 @@
"""Tests for Flow-as-tool functionality."""
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, start
from crewai.tools.flow_tool import FlowTool, create_flow_tools
# ---------------------------------------------------------------------------
# Test Flow classes
# ---------------------------------------------------------------------------
class SimpleFlow(Flow):
"""A simple flow that greets the user."""
@start()
def greet(self) -> str:
return "Hello from SimpleFlow!"
class MathFlow(Flow):
"""Performs basic math operations."""
@start()
def compute(self) -> str:
return "42"
class NoDocFlow(Flow):
@start()
def run_it(self) -> str:
return "no doc"
# ---------------------------------------------------------------------------
# FlowTool unit tests
# ---------------------------------------------------------------------------
class TestFlowTool:
def test_wrap_simple_flow(self) -> None:
tool = FlowTool(
name="simple_flow",
description="A simple flow that greets the user.",
flow_class=SimpleFlow,
)
assert tool.name == "simple_flow"
assert "greets the user" in tool.description
def test_run_invokes_kickoff(self) -> None:
mock_flow = MagicMock()
mock_flow.return_value = mock_flow # __init__ returns self
mock_flow.kickoff.return_value = "mocked result"
tool = FlowTool(
name="test_flow",
description="test",
flow_class=mock_flow,
)
result = tool._run(inputs="{}")
assert result == "mocked result"
mock_flow.kickoff.assert_called_once()
def test_run_with_json_inputs(self) -> None:
mock_flow = MagicMock()
mock_flow.return_value = mock_flow
mock_flow.kickoff.return_value = "result with inputs"
tool = FlowTool(
name="test_flow",
description="test",
flow_class=mock_flow,
)
result = tool._run(inputs='{"key": "value"}')
assert result == "result with inputs"
mock_flow.kickoff.assert_called_once_with(inputs={"key": "value"})
def test_run_with_invalid_json_defaults_to_empty(self) -> None:
mock_flow = MagicMock()
mock_flow.return_value = mock_flow
mock_flow.kickoff.return_value = "ok"
tool = FlowTool(
name="test_flow",
description="test",
flow_class=mock_flow,
)
result = tool._run(inputs="not valid json")
assert result == "ok"
mock_flow.kickoff.assert_called_once_with(inputs=None)
def test_run_returns_string(self) -> None:
mock_flow = MagicMock()
mock_flow.return_value = mock_flow
mock_flow.kickoff.return_value = 42
tool = FlowTool(
name="test_flow",
description="test",
flow_class=mock_flow,
)
result = tool._run()
assert result == "42"
assert isinstance(result, str)
# ---------------------------------------------------------------------------
# create_flow_tools tests
# ---------------------------------------------------------------------------
class TestCreateFlowTools:
def test_creates_tools_from_flow_classes(self) -> None:
tools = create_flow_tools([SimpleFlow, MathFlow])
assert len(tools) == 2
names = {t.name for t in tools}
assert "simple_flow" in names
assert "math_flow" in names
def test_description_from_docstring(self) -> None:
tools = create_flow_tools([SimpleFlow])
assert len(tools) == 1
assert "greets the user" in tools[0].description
def test_description_fallback_when_no_docstring(self) -> None:
tools = create_flow_tools([NoDocFlow])
assert len(tools) == 1
assert "NoDocFlow" in tools[0].description
def test_empty_list_returns_empty(self) -> None:
assert create_flow_tools([]) == []
def test_none_returns_empty(self) -> None:
assert create_flow_tools(None) == []
def test_tools_are_base_tool_instances(self) -> None:
from crewai.tools.base_tool import BaseTool
tools = create_flow_tools([SimpleFlow])
for tool in tools:
assert isinstance(tool, BaseTool)
# ---------------------------------------------------------------------------
# Agent integration tests
# ---------------------------------------------------------------------------
class TestAgentFlowIntegration:
def test_agent_with_flows_has_flow_tools(self) -> None:
from crewai.agent.core import Agent
agent = Agent(
role="Test Agent",
goal="Test flows",
backstory="I test things",
flows=[SimpleFlow, MathFlow],
)
tool_names = {t.name for t in (agent.tools or [])}
assert "simple_flow" in tool_names
assert "math_flow" in tool_names
def test_agent_without_flows_no_extra_tools(self) -> None:
from crewai.agent.core import Agent
agent = Agent(
role="Test Agent",
goal="Test",
backstory="I test things",
)
# Should not have any flow tools
flow_tool_names = {
t.name for t in (agent.tools or []) if isinstance(t, FlowTool)
}
assert len(flow_tool_names) == 0
def test_flow_tool_executes_real_flow(self) -> None:
"""Test that a FlowTool actually runs the Flow's kickoff."""
tools = create_flow_tools([SimpleFlow])
tool = tools[0]
result = tool.run(inputs="{}")
assert "Hello from SimpleFlow" in result