mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-06-03 15:28:10 +00:00
Compare commits
1 Commits
gui/monore
...
flow-struc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
424806cc41 |
@@ -6,7 +6,6 @@ from crewai.flow.async_feedback import (
|
||||
)
|
||||
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
||||
from crewai.flow.flow_config import flow_config
|
||||
from crewai.flow.flow_serializer import flow_structure
|
||||
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
|
||||
from crewai.flow.input_provider import InputProvider, InputResponse
|
||||
from crewai.flow.persistence import persist
|
||||
@@ -30,7 +29,6 @@ __all__ = [
|
||||
"and_",
|
||||
"build_flow_structure",
|
||||
"flow_config",
|
||||
"flow_structure",
|
||||
"human_feedback",
|
||||
"listen",
|
||||
"or_",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -3,8 +3,8 @@
|
||||
The implementation now lives in three modules, split by concern:
|
||||
|
||||
- ``crewai.flow.dsl`` -- authoring decorators (``@start`` / ``@listen`` /
|
||||
``@router``, ``or_`` / ``and_``)
|
||||
- ``crewai.flow.flow_definition`` -- the structural model extracted from the DSL
|
||||
``@router``, ``or_`` / ``and_``) and Python Flow class projection
|
||||
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
|
||||
- ``crewai.flow.runtime`` -- the Flow execution engine and state
|
||||
|
||||
Prefer importing from those modules in new code; this module preserves the
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,592 +0,0 @@
|
||||
"""Flow structure serializer for introspecting Flow classes.
|
||||
|
||||
This module provides the flow_structure() function that analyzes a Flow class
|
||||
and returns a JSON-serializable dictionary describing its graph structure.
|
||||
This is used by Studio UI to render a visual flow graph.
|
||||
|
||||
Example:
|
||||
>>> from crewai.flow import Flow, start, listen
|
||||
>>> from crewai.flow.flow_serializer import flow_structure
|
||||
>>>
|
||||
>>> class MyFlow(Flow):
|
||||
... @start()
|
||||
... def begin(self):
|
||||
... return "started"
|
||||
...
|
||||
... @listen(begin)
|
||||
... def process(self):
|
||||
... return "done"
|
||||
>>>
|
||||
>>> structure = flow_structure(MyFlow)
|
||||
>>> print(structure["name"])
|
||||
'MyFlow'
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
import re
|
||||
import textwrap
|
||||
from typing import Any, TypedDict, get_args, get_origin
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic_core import PydanticUndefined
|
||||
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowCondition,
|
||||
FlowMethod,
|
||||
ListenMethod,
|
||||
RouterMethod,
|
||||
StartMethod,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MethodInfo(TypedDict, total=False):
|
||||
"""Information about a single flow method.
|
||||
|
||||
Attributes:
|
||||
name: The method name.
|
||||
type: Method type - start, listen, router, or start_router.
|
||||
trigger_methods: List of method names that trigger this method.
|
||||
condition_type: 'AND' or 'OR' for composite conditions, null otherwise.
|
||||
router_paths: For routers, the possible route names returned.
|
||||
has_human_feedback: Whether the method has @human_feedback decorator.
|
||||
has_crew: Whether the method body references a Crew.
|
||||
"""
|
||||
|
||||
name: str
|
||||
type: str
|
||||
trigger_methods: list[str]
|
||||
condition_type: str | None
|
||||
router_paths: list[str]
|
||||
has_human_feedback: bool
|
||||
has_crew: bool
|
||||
|
||||
|
||||
class EdgeInfo(TypedDict, total=False):
|
||||
"""Information about an edge between flow methods.
|
||||
|
||||
Attributes:
|
||||
from_method: Source method name.
|
||||
to_method: Target method name.
|
||||
edge_type: Type of edge - 'listen' or 'route'.
|
||||
condition: Route name for router edges, null for listen edges.
|
||||
"""
|
||||
|
||||
from_method: str
|
||||
to_method: str
|
||||
edge_type: str
|
||||
condition: str | None
|
||||
|
||||
|
||||
class StateFieldInfo(TypedDict, total=False):
|
||||
"""Information about a state field.
|
||||
|
||||
Attributes:
|
||||
name: Field name.
|
||||
type: Field type as string.
|
||||
default: Default value if any.
|
||||
"""
|
||||
|
||||
name: str
|
||||
type: str
|
||||
default: Any
|
||||
|
||||
|
||||
class StateSchemaInfo(TypedDict, total=False):
|
||||
"""Information about the flow's state schema.
|
||||
|
||||
Attributes:
|
||||
fields: List of field information.
|
||||
"""
|
||||
|
||||
fields: list[StateFieldInfo]
|
||||
|
||||
|
||||
class FlowStructureInfo(TypedDict, total=False):
|
||||
"""Complete flow structure information.
|
||||
|
||||
Attributes:
|
||||
name: Flow class name.
|
||||
description: Flow docstring if available.
|
||||
methods: List of method information.
|
||||
edges: List of edge information.
|
||||
state_schema: State schema if typed, null otherwise.
|
||||
inputs: Detected flow inputs if available.
|
||||
"""
|
||||
|
||||
name: str
|
||||
description: str | None
|
||||
methods: list[MethodInfo]
|
||||
edges: list[EdgeInfo]
|
||||
state_schema: StateSchemaInfo | None
|
||||
inputs: list[str]
|
||||
|
||||
|
||||
def _get_method_type(
|
||||
method_name: str,
|
||||
method: Any,
|
||||
start_methods: list[str],
|
||||
routers: set[str],
|
||||
) -> str:
|
||||
"""Determine the type of a flow method.
|
||||
|
||||
Args:
|
||||
method_name: Name of the method.
|
||||
method: The method object.
|
||||
start_methods: List of start method names.
|
||||
routers: Set of router method names.
|
||||
|
||||
Returns:
|
||||
One of: 'start', 'listen', 'router', or 'start_router'.
|
||||
"""
|
||||
is_start = method_name in start_methods or getattr(
|
||||
method, "__is_start_method__", False
|
||||
)
|
||||
is_router = method_name in routers or getattr(method, "__is_router__", False)
|
||||
|
||||
if is_start and is_router:
|
||||
return "start_router"
|
||||
if is_start:
|
||||
return "start"
|
||||
if is_router:
|
||||
return "router"
|
||||
return "listen"
|
||||
|
||||
|
||||
def _has_human_feedback(method: Any) -> bool:
|
||||
"""Check if a method has the @human_feedback decorator.
|
||||
|
||||
Args:
|
||||
method: The method object to check.
|
||||
|
||||
Returns:
|
||||
True if the method has __human_feedback_config__ attribute.
|
||||
"""
|
||||
return hasattr(method, "__human_feedback_config__")
|
||||
|
||||
|
||||
def _detect_crew_reference(method: Any) -> bool:
|
||||
"""Detect if a method body references a Crew.
|
||||
|
||||
Checks for patterns like:
|
||||
- .crew() method calls
|
||||
- Crew( instantiation
|
||||
- References to Crew class in type hints
|
||||
|
||||
Note:
|
||||
This is a **best-effort heuristic for UI hints**, not a guarantee.
|
||||
Uses inspect.getsource + regex which can false-positive on comments
|
||||
or string literals, and may fail on dynamically generated methods
|
||||
or lambdas. Do not rely on this for correctness-critical logic.
|
||||
|
||||
Args:
|
||||
method: The method object to inspect.
|
||||
|
||||
Returns:
|
||||
True if crew reference detected, False otherwise.
|
||||
"""
|
||||
try:
|
||||
func = method
|
||||
if hasattr(method, "_meth"):
|
||||
func = method._meth
|
||||
elif hasattr(method, "__wrapped__"):
|
||||
func = method.__wrapped__
|
||||
|
||||
source = inspect.getsource(func)
|
||||
source = textwrap.dedent(source)
|
||||
|
||||
crew_patterns = [
|
||||
r"\.crew\(\)", # .crew() method call
|
||||
r"Crew\s*\(", # Crew( instantiation
|
||||
r":\s*Crew\b", # Type hint with Crew
|
||||
r"->.*Crew", # Return type hint with Crew
|
||||
]
|
||||
|
||||
for pattern in crew_patterns:
|
||||
if re.search(pattern, source):
|
||||
return True
|
||||
|
||||
return False
|
||||
except (OSError, TypeError):
|
||||
return False
|
||||
|
||||
|
||||
def _extract_trigger_methods(method: Any) -> tuple[list[str], str | None]:
|
||||
"""Extract trigger methods and condition type from a method.
|
||||
|
||||
Args:
|
||||
method: The method object to inspect.
|
||||
|
||||
Returns:
|
||||
Tuple of (trigger_methods list, condition_type or None).
|
||||
"""
|
||||
trigger_methods: list[str] = []
|
||||
condition_type: str | None = None
|
||||
|
||||
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
|
||||
trigger_methods = [str(m) for m in method.__trigger_methods__]
|
||||
|
||||
# For complex conditions (or_/and_ combinators), extract from __trigger_condition__
|
||||
if (
|
||||
not trigger_methods
|
||||
and hasattr(method, "__trigger_condition__")
|
||||
and method.__trigger_condition__
|
||||
):
|
||||
trigger_condition = method.__trigger_condition__
|
||||
trigger_methods = _extract_all_methods_from_condition(trigger_condition)
|
||||
|
||||
if hasattr(method, "__condition_type__") and method.__condition_type__:
|
||||
condition_type = str(method.__condition_type__)
|
||||
|
||||
return trigger_methods, condition_type
|
||||
|
||||
|
||||
def _extract_router_paths(
|
||||
method: Any, router_paths_registry: dict[str, list[str]]
|
||||
) -> list[str]:
|
||||
"""Extract router paths for a router method.
|
||||
|
||||
Args:
|
||||
method: The method object.
|
||||
router_paths_registry: The class-level _router_paths dict.
|
||||
|
||||
Returns:
|
||||
List of possible route names.
|
||||
"""
|
||||
method_name = getattr(method, "__name__", "")
|
||||
|
||||
if hasattr(method, "__router_paths__") and method.__router_paths__:
|
||||
return [str(p) for p in method.__router_paths__]
|
||||
|
||||
if method_name in router_paths_registry:
|
||||
return [str(p) for p in router_paths_registry[method_name]]
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def _extract_all_methods_from_condition(
|
||||
condition: str | FlowCondition | dict[str, Any] | list[Any],
|
||||
) -> list[str]:
|
||||
"""Extract all method names from a condition tree recursively.
|
||||
|
||||
Args:
|
||||
condition: Can be a string, FlowCondition tuple, dict, or list.
|
||||
|
||||
Returns:
|
||||
List of all method names found in the condition.
|
||||
"""
|
||||
if isinstance(condition, str):
|
||||
return [condition]
|
||||
if isinstance(condition, tuple) and len(condition) == 2:
|
||||
# FlowCondition: (condition_type, methods_list)
|
||||
_, methods = condition
|
||||
if isinstance(methods, list):
|
||||
result: list[str] = []
|
||||
for m in methods:
|
||||
result.extend(_extract_all_methods_from_condition(m))
|
||||
return result
|
||||
return []
|
||||
if isinstance(condition, dict):
|
||||
conditions_list = condition.get("conditions", [])
|
||||
dict_methods: list[str] = []
|
||||
for sub_cond in conditions_list:
|
||||
dict_methods.extend(_extract_all_methods_from_condition(sub_cond))
|
||||
return dict_methods
|
||||
if isinstance(condition, list):
|
||||
list_methods: list[str] = []
|
||||
for item in condition:
|
||||
list_methods.extend(_extract_all_methods_from_condition(item))
|
||||
return list_methods
|
||||
return []
|
||||
|
||||
|
||||
def _generate_edges(
|
||||
listeners: dict[str, tuple[str, list[str]] | FlowCondition],
|
||||
routers: set[str],
|
||||
router_paths: dict[str, list[str]],
|
||||
all_methods: set[str],
|
||||
) -> list[EdgeInfo]:
|
||||
"""Generate edges from listeners and routers.
|
||||
|
||||
Args:
|
||||
listeners: Map of listener_name -> (condition_type, trigger_methods) or FlowCondition.
|
||||
routers: Set of router method names.
|
||||
router_paths: Map of router_name -> possible return values.
|
||||
all_methods: Set of all method names in the flow.
|
||||
|
||||
Returns:
|
||||
List of EdgeInfo dictionaries.
|
||||
"""
|
||||
edges: list[EdgeInfo] = []
|
||||
|
||||
for listener_name, condition_data in listeners.items():
|
||||
trigger_methods: list[str] = []
|
||||
|
||||
if isinstance(condition_data, tuple) and len(condition_data) == 2:
|
||||
_condition_type, methods = condition_data
|
||||
trigger_methods = [str(m) for m in methods]
|
||||
elif isinstance(condition_data, dict):
|
||||
trigger_methods = _extract_all_methods_from_condition(condition_data)
|
||||
|
||||
edges.extend(
|
||||
EdgeInfo(
|
||||
from_method=trigger,
|
||||
to_method=listener_name,
|
||||
edge_type="listen",
|
||||
condition=None,
|
||||
)
|
||||
for trigger in trigger_methods
|
||||
if trigger in all_methods
|
||||
)
|
||||
|
||||
for router_name, paths in router_paths.items():
|
||||
for path in paths:
|
||||
for listener_name, condition_data in listeners.items():
|
||||
path_triggers: list[str] = []
|
||||
|
||||
if isinstance(condition_data, tuple) and len(condition_data) == 2:
|
||||
_, methods = condition_data
|
||||
path_triggers = [str(m) for m in methods]
|
||||
elif isinstance(condition_data, dict):
|
||||
path_triggers = _extract_all_methods_from_condition(condition_data)
|
||||
|
||||
if str(path) in path_triggers:
|
||||
edges.append(
|
||||
EdgeInfo(
|
||||
from_method=router_name,
|
||||
to_method=listener_name,
|
||||
edge_type="route",
|
||||
condition=str(path),
|
||||
)
|
||||
)
|
||||
|
||||
return edges
|
||||
|
||||
|
||||
def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
|
||||
"""Extract state schema from a Flow class.
|
||||
|
||||
Checks for:
|
||||
- Generic type parameter (Flow[MyState])
|
||||
- initial_state class attribute
|
||||
|
||||
Args:
|
||||
flow_class: The Flow class to inspect.
|
||||
|
||||
Returns:
|
||||
StateSchemaInfo if a Pydantic model state is detected, None otherwise.
|
||||
"""
|
||||
state_type: type | None = None
|
||||
|
||||
# _initial_state_t is set by Flow.__class_getitem__
|
||||
if hasattr(flow_class, "_initial_state_t"):
|
||||
state_type = flow_class._initial_state_t
|
||||
|
||||
if state_type is None and hasattr(flow_class, "initial_state"):
|
||||
initial_state = flow_class.initial_state
|
||||
if isinstance(initial_state, type) and issubclass(initial_state, BaseModel):
|
||||
state_type = initial_state
|
||||
elif isinstance(initial_state, BaseModel):
|
||||
state_type = type(initial_state)
|
||||
|
||||
if state_type is None and hasattr(flow_class, "__orig_bases__"):
|
||||
for base in flow_class.__orig_bases__:
|
||||
origin = get_origin(base)
|
||||
if origin is not None:
|
||||
args = get_args(base)
|
||||
if args:
|
||||
candidate = args[0]
|
||||
if isinstance(candidate, type) and issubclass(candidate, BaseModel):
|
||||
state_type = candidate
|
||||
break
|
||||
|
||||
if state_type is None or not issubclass(state_type, BaseModel):
|
||||
return None
|
||||
|
||||
fields: list[StateFieldInfo] = []
|
||||
try:
|
||||
model_fields = state_type.model_fields
|
||||
for field_name, field_info in model_fields.items():
|
||||
field_type_str = "Any"
|
||||
if field_info.annotation is not None:
|
||||
field_type_str = str(field_info.annotation)
|
||||
field_type_str = field_type_str.replace("typing.", "")
|
||||
field_type_str = field_type_str.replace("<class '", "").replace(
|
||||
"'>", ""
|
||||
)
|
||||
|
||||
default_value = None
|
||||
if (
|
||||
field_info.default is not PydanticUndefined
|
||||
and field_info.default is not None
|
||||
and not callable(field_info.default)
|
||||
):
|
||||
try:
|
||||
default_value = field_info.default
|
||||
except Exception:
|
||||
default_value = str(field_info.default)
|
||||
|
||||
fields.append(
|
||||
StateFieldInfo(
|
||||
name=field_name,
|
||||
type=field_type_str,
|
||||
default=default_value,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Failed to extract state schema fields for %s", flow_class.__name__
|
||||
)
|
||||
|
||||
return StateSchemaInfo(fields=fields) if fields else None
|
||||
|
||||
|
||||
def _detect_flow_inputs(flow_class: type) -> list[str]:
|
||||
"""Detect flow input parameters.
|
||||
|
||||
Inspects the __init__ signature for custom parameters beyond standard Flow params.
|
||||
|
||||
Args:
|
||||
flow_class: The Flow class to inspect.
|
||||
|
||||
Returns:
|
||||
List of detected input names.
|
||||
"""
|
||||
inputs: list[str] = []
|
||||
|
||||
try:
|
||||
init_method = flow_class.__init__ # type: ignore[misc]
|
||||
init_sig = inspect.signature(init_method)
|
||||
standard_params = {
|
||||
"self",
|
||||
"persistence",
|
||||
"tracing",
|
||||
"suppress_flow_events",
|
||||
"max_method_calls",
|
||||
"kwargs",
|
||||
}
|
||||
inputs.extend(
|
||||
param_name
|
||||
for param_name in init_sig.parameters
|
||||
if param_name not in standard_params and not param_name.startswith("_")
|
||||
)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Failed to detect inputs from __init__ for %s", flow_class.__name__
|
||||
)
|
||||
|
||||
return inputs
|
||||
|
||||
|
||||
def flow_structure(flow_class: type) -> FlowStructureInfo:
|
||||
"""Introspect a Flow class and return its structure as a JSON-serializable dict.
|
||||
|
||||
This function analyzes a Flow CLASS (not instance) and returns complete
|
||||
information about its graph structure including methods, edges, and state.
|
||||
|
||||
Args:
|
||||
flow_class: A Flow class (not an instance) to introspect.
|
||||
|
||||
Returns:
|
||||
FlowStructureInfo dictionary containing:
|
||||
- name: Flow class name
|
||||
- description: Docstring if available
|
||||
- methods: List of method info dicts
|
||||
- edges: List of edge info dicts
|
||||
- state_schema: State schema if typed, None otherwise
|
||||
- inputs: Detected input names
|
||||
|
||||
Raises:
|
||||
TypeError: If flow_class is not a class.
|
||||
|
||||
Example:
|
||||
>>> structure = flow_structure(MyFlow)
|
||||
>>> print(structure["name"])
|
||||
'MyFlow'
|
||||
>>> for method in structure["methods"]:
|
||||
... print(method["name"], method["type"])
|
||||
"""
|
||||
if not isinstance(flow_class, type):
|
||||
raise TypeError(
|
||||
f"flow_structure requires a Flow class, not an instance. "
|
||||
f"Got {type(flow_class).__name__}"
|
||||
)
|
||||
|
||||
start_methods: list[str] = getattr(flow_class, "_start_methods", [])
|
||||
listeners: dict[str, Any] = getattr(flow_class, "_listeners", {})
|
||||
routers: set[str] = getattr(flow_class, "_routers", set())
|
||||
router_paths_registry: dict[str, list[str]] = getattr(
|
||||
flow_class, "_router_paths", {}
|
||||
)
|
||||
|
||||
methods: list[MethodInfo] = []
|
||||
all_method_names: set[str] = set()
|
||||
|
||||
for attr_name in dir(flow_class):
|
||||
if attr_name.startswith("_"):
|
||||
continue
|
||||
|
||||
try:
|
||||
attr = getattr(flow_class, attr_name)
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
is_flow_method = (
|
||||
isinstance(attr, (FlowMethod, StartMethod, ListenMethod, RouterMethod))
|
||||
or hasattr(attr, "__is_flow_method__")
|
||||
or hasattr(attr, "__is_start_method__")
|
||||
or hasattr(attr, "__trigger_methods__")
|
||||
or hasattr(attr, "__is_router__")
|
||||
)
|
||||
|
||||
if not is_flow_method:
|
||||
continue
|
||||
|
||||
all_method_names.add(attr_name)
|
||||
|
||||
method_type = _get_method_type(attr_name, attr, start_methods, routers)
|
||||
|
||||
trigger_methods, condition_type = _extract_trigger_methods(attr)
|
||||
|
||||
router_paths_list: list[str] = []
|
||||
if method_type in ("router", "start_router"):
|
||||
router_paths_list = _extract_router_paths(attr, router_paths_registry)
|
||||
|
||||
has_hf = _has_human_feedback(attr)
|
||||
|
||||
has_crew = _detect_crew_reference(attr)
|
||||
|
||||
method_info = MethodInfo(
|
||||
name=attr_name,
|
||||
type=method_type,
|
||||
trigger_methods=trigger_methods,
|
||||
condition_type=condition_type,
|
||||
router_paths=router_paths_list,
|
||||
has_human_feedback=has_hf,
|
||||
has_crew=has_crew,
|
||||
)
|
||||
methods.append(method_info)
|
||||
|
||||
edges = _generate_edges(listeners, routers, router_paths_registry, all_method_names)
|
||||
|
||||
state_schema = _extract_state_schema(flow_class)
|
||||
|
||||
inputs = _detect_flow_inputs(flow_class)
|
||||
|
||||
description: str | None = None
|
||||
if flow_class.__doc__:
|
||||
description = flow_class.__doc__.strip()
|
||||
|
||||
return FlowStructureInfo(
|
||||
name=flow_class.__name__,
|
||||
description=description,
|
||||
methods=methods,
|
||||
edges=edges,
|
||||
state_schema=state_schema,
|
||||
inputs=inputs,
|
||||
)
|
||||
@@ -18,6 +18,17 @@ R = TypeVar("R")
|
||||
FlowConditionType: TypeAlias = Literal["OR", "AND"]
|
||||
SimpleFlowCondition: TypeAlias = tuple[FlowConditionType, list[FlowMethodName]]
|
||||
|
||||
__all__ = [
|
||||
"FlowCondition",
|
||||
"FlowConditionType",
|
||||
"FlowConditions",
|
||||
"FlowMethod",
|
||||
"ListenMethod",
|
||||
"RouterMethod",
|
||||
"SimpleFlowCondition",
|
||||
"StartMethod",
|
||||
]
|
||||
|
||||
|
||||
class FlowCondition(TypedDict, total=False):
|
||||
"""Type definition for flow trigger conditions.
|
||||
@@ -73,9 +84,11 @@ class FlowMethod(Generic[P, R]):
|
||||
# Preserve flow-related attributes from wrapped method (e.g., from @human_feedback)
|
||||
for attr in [
|
||||
"__is_router__",
|
||||
"__router_paths__",
|
||||
"__router_emit__",
|
||||
"__human_feedback_config__",
|
||||
"_hf_llm", # Live LLM object for HITL resume
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
"_human_feedback_llm", # Live LLM object for HITL resume
|
||||
]:
|
||||
if hasattr(meth, attr):
|
||||
setattr(self, attr, getattr(meth, attr))
|
||||
@@ -165,3 +178,4 @@ class RouterMethod(FlowMethod[P, R]):
|
||||
__trigger_methods__: list[FlowMethodName] | None = None
|
||||
__condition_type__: FlowConditionType | None = None
|
||||
__trigger_condition__: FlowCondition | None = None
|
||||
__router_emit__: list[str] | None = None
|
||||
|
||||
@@ -65,6 +65,7 @@ from typing import TYPE_CHECKING, Any, TypeVar
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.flow.flow_definition import FlowMethodDefinition
|
||||
from crewai.flow.flow_wrappers import FlowMethod
|
||||
|
||||
|
||||
@@ -78,14 +79,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
F = TypeVar("F", bound=Callable[..., Any])
|
||||
|
||||
__all__ = ["HumanFeedbackResult", "human_feedback"]
|
||||
|
||||
|
||||
def _serialize_llm_for_context(llm: Any) -> dict[str, Any] | str | None:
|
||||
"""Serialize a BaseLLM object to a dict preserving full config.
|
||||
|
||||
Delegates to ``llm.to_config_dict()`` when available (BaseLLM and
|
||||
subclasses). Falls back to extracting the model string with provider
|
||||
prefix for unknown LLM types.
|
||||
"""
|
||||
to_config: Callable[[], dict[str, Any]] | None = getattr(
|
||||
llm, "to_config_dict", None
|
||||
)
|
||||
@@ -103,13 +100,6 @@ def _serialize_llm_for_context(llm: Any) -> dict[str, Any] | str | None:
|
||||
def _deserialize_llm_from_context(
|
||||
llm_data: dict[str, Any] | str | None,
|
||||
) -> BaseLLM | None:
|
||||
"""Reconstruct an LLM instance from serialized context data.
|
||||
|
||||
Handles both the new dict format (with full config) and the legacy
|
||||
string format (model name only) for backward compatibility.
|
||||
|
||||
Returns a BaseLLM instance, or None if llm_data is None.
|
||||
"""
|
||||
if llm_data is None:
|
||||
return None
|
||||
|
||||
@@ -202,12 +192,12 @@ class HumanFeedbackMethod(FlowMethod[Any, Any]):
|
||||
|
||||
Attributes:
|
||||
__is_router__: True when emit is specified, enabling router behavior.
|
||||
__router_paths__: List of possible outcomes when acting as a router.
|
||||
__router_emit__: List of possible outcomes when acting as a router.
|
||||
__human_feedback_config__: The HumanFeedbackConfig for this method.
|
||||
"""
|
||||
|
||||
__is_router__: bool = False
|
||||
__router_paths__: list[str] | None = None
|
||||
__router_emit__: list[str] | None = None
|
||||
__human_feedback_config__: HumanFeedbackConfig | None = None
|
||||
|
||||
|
||||
@@ -356,20 +346,12 @@ def human_feedback(
|
||||
raise ValueError("default_outcome requires emit to be specified.")
|
||||
|
||||
def decorator(func: F) -> F:
|
||||
"""Inner decorator that wraps the function."""
|
||||
|
||||
def _get_hitl_prompt(key: str) -> str:
|
||||
"""Read a HITL prompt from the i18n translations."""
|
||||
from crewai.utilities.i18n import I18N_DEFAULT
|
||||
|
||||
return I18N_DEFAULT.slice(key)
|
||||
|
||||
def _resolve_llm_instance() -> Any:
|
||||
"""Resolve the ``llm`` parameter to a BaseLLM instance.
|
||||
|
||||
Uses the SAME model specified in the decorator so pre-review,
|
||||
distillation, and outcome collapsing all share one model.
|
||||
"""
|
||||
if llm is None:
|
||||
from crewai.llm import LLM
|
||||
|
||||
@@ -383,7 +365,6 @@ def human_feedback(
|
||||
def _pre_review_with_lessons(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> Any:
|
||||
"""Recall past HITL lessons and use LLM to pre-review the output."""
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
@@ -431,7 +412,6 @@ def human_feedback(
|
||||
def _distill_and_store_lessons(
|
||||
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
|
||||
) -> None:
|
||||
"""Extract generalizable lessons from output + feedback, store in memory."""
|
||||
try:
|
||||
mem = flow_instance.memory
|
||||
if mem is None:
|
||||
@@ -485,7 +465,6 @@ def human_feedback(
|
||||
def _build_feedback_context(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> tuple[Any, Any]:
|
||||
"""Build the PendingFeedbackContext and resolve the effective provider."""
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
@@ -509,7 +488,6 @@ def human_feedback(
|
||||
return context, effective_provider
|
||||
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
"""Request feedback using provider or default console (sync)."""
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
@@ -535,7 +513,6 @@ def human_feedback(
|
||||
async def _request_feedback_async(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> str:
|
||||
"""Request feedback, awaiting the provider if it returns a coroutine."""
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
@@ -559,7 +536,6 @@ def human_feedback(
|
||||
method_output: Any,
|
||||
raw_feedback: str,
|
||||
) -> HumanFeedbackResult | str:
|
||||
"""Process feedback and return result or outcome."""
|
||||
collapsed_outcome: str | None = None
|
||||
|
||||
if not raw_feedback.strip():
|
||||
@@ -661,6 +637,10 @@ def human_feedback(
|
||||
"__condition_type__",
|
||||
"__trigger_condition__",
|
||||
"__is_flow_method__",
|
||||
"__flow_persistence_config__",
|
||||
"__is_router__",
|
||||
"__router_emit__",
|
||||
"__flow_method_definition__",
|
||||
]:
|
||||
if hasattr(func, attr):
|
||||
setattr(wrapper, attr, getattr(func, attr))
|
||||
@@ -681,7 +661,16 @@ def human_feedback(
|
||||
|
||||
if emit:
|
||||
wrapper.__is_router__ = True
|
||||
wrapper.__router_paths__ = list(emit)
|
||||
wrapper.__router_emit__ = list(emit)
|
||||
# Keep the definition fragment in sync: emit promotes the method to
|
||||
# a router and the feedback outcomes replace any emit recorded by an
|
||||
# inner @router. Copy before updating so the wrapped method's own
|
||||
# fragment (shared by reference) is left untouched.
|
||||
fragment = getattr(wrapper, "__flow_method_definition__", None)
|
||||
if isinstance(fragment, FlowMethodDefinition):
|
||||
wrapper.__flow_method_definition__ = fragment.model_copy(
|
||||
update={"router": True, "emit": list(emit)}
|
||||
)
|
||||
|
||||
# Stash the live LLM object for HITL resume to retrieve.
|
||||
# When a flow pauses for human feedback and later resumes (possibly in a
|
||||
@@ -689,7 +678,7 @@ def human_feedback(
|
||||
# By storing the original LLM on the wrapper, resume_async can retrieve
|
||||
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
|
||||
# instead of creating a bare LLM from just the model string.
|
||||
wrapper._hf_llm = llm
|
||||
wrapper._human_feedback_llm = llm
|
||||
|
||||
return wrapper # type: ignore[no-any-return]
|
||||
|
||||
|
||||
@@ -4,16 +4,9 @@ CrewAI Flow Persistence.
|
||||
This module provides interfaces and implementations for persisting flow states.
|
||||
"""
|
||||
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.persistence.decorators import persist
|
||||
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
|
||||
|
||||
|
||||
__all__ = ["FlowPersistence", "SQLiteFlowPersistence", "persist"]
|
||||
|
||||
StateType = TypeVar("StateType", bound=dict[str, Any] | BaseModel)
|
||||
DictStateType = dict[str, Any]
|
||||
|
||||
@@ -28,6 +28,7 @@ import asyncio
|
||||
from collections.abc import Callable
|
||||
import functools
|
||||
import logging
|
||||
from types import SimpleNamespace
|
||||
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
|
||||
|
||||
from crewai_core.printer import PRINTER
|
||||
@@ -44,6 +45,8 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
T = TypeVar("T")
|
||||
|
||||
__all__ = ["PersistenceDecorator", "persist"]
|
||||
|
||||
LOG_MESSAGES: Final[dict[str, str]] = {
|
||||
"save_state": "Saving flow state to memory for ID: {}",
|
||||
"save_error": "Failed to persist state for method {}: {}",
|
||||
@@ -52,6 +55,31 @@ LOG_MESSAGES: Final[dict[str, str]] = {
|
||||
}
|
||||
|
||||
|
||||
def _stamp_persistence_metadata(
|
||||
target: Any,
|
||||
persistence: FlowPersistence,
|
||||
verbose: bool,
|
||||
) -> None:
|
||||
target.__flow_persistence_config__ = SimpleNamespace(
|
||||
persistence=persistence,
|
||||
verbose=verbose,
|
||||
)
|
||||
|
||||
|
||||
_PRESERVED_FLOW_ATTRS: Final[tuple[str, ...]] = (
|
||||
"__is_start_method__",
|
||||
"__trigger_methods__",
|
||||
"__condition_type__",
|
||||
"__trigger_condition__",
|
||||
"__is_router__",
|
||||
"__router_emit__",
|
||||
"__human_feedback_config__",
|
||||
"__flow_persistence_config__",
|
||||
"__flow_method_definition__",
|
||||
"_human_feedback_llm",
|
||||
)
|
||||
|
||||
|
||||
class PersistenceDecorator:
|
||||
"""Class to handle flow state persistence with consistent logging."""
|
||||
|
||||
@@ -163,10 +191,10 @@ def persist(
|
||||
"""
|
||||
|
||||
def decorator(target: type | Callable[..., T]) -> type | Callable[..., T]:
|
||||
"""Decorator that handles both class and method decoration."""
|
||||
actual_persistence = persistence or SQLiteFlowPersistence()
|
||||
|
||||
if isinstance(target, type):
|
||||
_stamp_persistence_metadata(target, actual_persistence, verbose)
|
||||
original_init = target.__init__ # type: ignore[misc]
|
||||
|
||||
@functools.wraps(original_init)
|
||||
@@ -211,12 +239,7 @@ def persist(
|
||||
|
||||
wrapped = create_async_wrapper(name, method)
|
||||
|
||||
for attr in [
|
||||
"__is_start_method__",
|
||||
"__trigger_methods__",
|
||||
"__condition_type__",
|
||||
"__is_router__",
|
||||
]:
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(wrapped, attr, getattr(method, attr))
|
||||
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
@@ -239,12 +262,7 @@ def persist(
|
||||
|
||||
wrapped = create_sync_wrapper(name, method)
|
||||
|
||||
for attr in [
|
||||
"__is_start_method__",
|
||||
"__trigger_methods__",
|
||||
"__condition_type__",
|
||||
"__is_router__",
|
||||
]:
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(wrapped, attr, getattr(method, attr))
|
||||
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
@@ -254,6 +272,7 @@ def persist(
|
||||
return target
|
||||
method = target
|
||||
method.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(method, actual_persistence, verbose)
|
||||
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
|
||||
@@ -271,15 +290,13 @@ def persist(
|
||||
)
|
||||
return cast(T, result)
|
||||
|
||||
for attr in [
|
||||
"__is_start_method__",
|
||||
"__trigger_methods__",
|
||||
"__condition_type__",
|
||||
"__is_router__",
|
||||
]:
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(method_async_wrapper, attr, getattr(method, attr))
|
||||
method_async_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(
|
||||
method_async_wrapper, actual_persistence, verbose
|
||||
)
|
||||
return cast(Callable[..., T], method_async_wrapper)
|
||||
|
||||
@functools.wraps(method)
|
||||
@@ -290,15 +307,11 @@ def persist(
|
||||
)
|
||||
return result
|
||||
|
||||
for attr in [
|
||||
"__is_start_method__",
|
||||
"__trigger_methods__",
|
||||
"__condition_type__",
|
||||
"__is_router__",
|
||||
]:
|
||||
for attr in _PRESERVED_FLOW_ATTRS:
|
||||
if hasattr(method, attr):
|
||||
setattr(method_sync_wrapper, attr, getattr(method, attr))
|
||||
method_sync_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
|
||||
_stamp_persistence_metadata(method_sync_wrapper, actual_persistence, verbose)
|
||||
return cast(Callable[..., T], method_sync_wrapper)
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
"""Flow runtime: the Flow execution engine, its metaclass, and state proxies.
|
||||
|
||||
Holds the Flow class (kickoff/resume/listener dispatch), the FlowMeta
|
||||
metaclass (Pydantic model construction; structural extraction is delegated to
|
||||
``flow_definition.extract_flow_definition``), and the thread-safe state
|
||||
proxies. The authoring decorators live in ``crewai.flow.dsl``.
|
||||
metaclass (Pydantic model construction; Python class extraction is delegated
|
||||
to ``dsl.extract_flow_definition``), and the thread-safe state proxies.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -85,17 +84,18 @@ from crewai.events.types.flow_events import (
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
|
||||
from crewai.flow.flow_definition import (
|
||||
from crewai.flow.dsl import (
|
||||
_extract_all_methods,
|
||||
_extract_all_methods_recursive,
|
||||
_normalize_condition,
|
||||
build_flow_definition,
|
||||
extract_flow_definition,
|
||||
is_flow_condition_dict,
|
||||
is_flow_method,
|
||||
is_flow_method_name,
|
||||
is_simple_flow_condition,
|
||||
)
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
|
||||
from crewai.flow.flow_definition import FlowDefinition
|
||||
from crewai.flow.flow_wrappers import (
|
||||
FlowCondition,
|
||||
FlowMethod,
|
||||
@@ -585,14 +585,17 @@ class FlowMeta(ModelMetaclass):
|
||||
|
||||
cls = super().__new__(mcs, name, bases, namespace)
|
||||
|
||||
start_methods, listeners, routers, router_paths = extract_flow_definition(
|
||||
start_methods, listeners, routers, router_emit = extract_flow_definition(
|
||||
namespace
|
||||
)
|
||||
|
||||
cls._start_methods = start_methods # type: ignore[attr-defined]
|
||||
cls._listeners = listeners # type: ignore[attr-defined]
|
||||
cls._routers = routers # type: ignore[attr-defined]
|
||||
cls._router_paths = router_paths # type: ignore[attr-defined]
|
||||
cls._router_emit = router_emit # type: ignore[attr-defined]
|
||||
# The static FlowDefinition is built lazily (on first access via
|
||||
# ``Flow.flow_definition()`` or visualization), not at class-definition
|
||||
# time, to avoid AST parsing and diagnostic logging on every import.
|
||||
|
||||
return cls
|
||||
|
||||
@@ -612,10 +615,20 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
_start_methods: ClassVar[list[FlowMethodName]] = []
|
||||
_listeners: ClassVar[dict[FlowMethodName, SimpleFlowCondition | FlowCondition]] = {}
|
||||
_routers: ClassVar[set[FlowMethodName]] = set()
|
||||
_router_paths: ClassVar[dict[FlowMethodName, list[FlowMethodName]]] = {}
|
||||
_router_emit: ClassVar[dict[FlowMethodName, list[FlowMethodName]]] = {}
|
||||
_flow_definition: ClassVar[FlowDefinition | None] = None
|
||||
|
||||
entity_type: Literal["flow"] = "flow"
|
||||
|
||||
@classmethod
|
||||
def flow_definition(cls) -> FlowDefinition:
|
||||
"""Return the static Flow Definition built from this Flow class."""
|
||||
flow_definition = cls.__dict__.get("_flow_definition")
|
||||
if flow_definition is None:
|
||||
flow_definition = build_flow_definition(cls)
|
||||
cls._flow_definition = flow_definition
|
||||
return flow_definition
|
||||
|
||||
initial_state: Annotated[ # type: ignore[type-arg]
|
||||
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
|
||||
BeforeValidator(_deserialize_initial_state),
|
||||
@@ -1293,7 +1306,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
llm = None
|
||||
method = self._methods.get(FlowMethodName(context.method_name))
|
||||
if method is not None:
|
||||
live_llm = getattr(method, "_hf_llm", None)
|
||||
live_llm = getattr(method, "_human_feedback_llm", None)
|
||||
if live_llm is not None:
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
|
||||
@@ -2618,7 +2631,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
Returns:
|
||||
True if the condition is satisfied, False otherwise
|
||||
"""
|
||||
if is_flow_method_name(condition):
|
||||
if isinstance(condition, str):
|
||||
return condition == trigger_method
|
||||
|
||||
if is_flow_condition_dict(condition):
|
||||
|
||||
@@ -22,7 +22,6 @@ P = ParamSpec("P")
|
||||
R = TypeVar("R", covariant=True)
|
||||
|
||||
FlowMethodName = NewType("FlowMethodName", str)
|
||||
FlowRouteName = NewType("FlowRouteName", str)
|
||||
PendingListenerKey = NewType(
|
||||
"PendingListenerKey",
|
||||
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
"""Backwards-compatible shim. The implementation moved to ``crewai.flow.flow_definition``.
|
||||
|
||||
Import from ``crewai.flow.flow_definition`` directly in new code.
|
||||
"""
|
||||
|
||||
from crewai.flow.flow_definition import (
|
||||
_extract_all_methods,
|
||||
_extract_all_methods_recursive,
|
||||
_extract_string_literals_from_type_annotation,
|
||||
_normalize_condition,
|
||||
_unwrap_function,
|
||||
build_ancestor_dict,
|
||||
build_parent_children_dict,
|
||||
calculate_node_levels,
|
||||
count_outgoing_edges,
|
||||
dfs_ancestors,
|
||||
extract_flow_definition,
|
||||
get_child_index,
|
||||
get_possible_return_constants,
|
||||
is_ancestor,
|
||||
is_flow_condition_dict,
|
||||
is_flow_condition_list,
|
||||
is_flow_method,
|
||||
is_flow_method_callable,
|
||||
is_flow_method_name,
|
||||
is_simple_flow_condition,
|
||||
process_router_paths,
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"_extract_all_methods",
|
||||
"_extract_all_methods_recursive",
|
||||
"_extract_string_literals_from_type_annotation",
|
||||
"_normalize_condition",
|
||||
"_unwrap_function",
|
||||
"build_ancestor_dict",
|
||||
"build_parent_children_dict",
|
||||
"calculate_node_levels",
|
||||
"count_outgoing_edges",
|
||||
"dfs_ancestors",
|
||||
"extract_flow_definition",
|
||||
"get_child_index",
|
||||
"get_possible_return_constants",
|
||||
"is_ancestor",
|
||||
"is_flow_condition_dict",
|
||||
"is_flow_condition_list",
|
||||
"is_flow_method",
|
||||
"is_flow_method_callable",
|
||||
"is_flow_method_name",
|
||||
"is_simple_flow_condition",
|
||||
"process_router_paths",
|
||||
]
|
||||
@@ -684,7 +684,7 @@ class TriggeredByHighlighter {
|
||||
});
|
||||
} else {
|
||||
for (const [nodeName, nodeInfo] of Object.entries(nodeData)) {
|
||||
if (nodeInfo.router_paths && nodeInfo.router_paths.includes(triggerNodeId)) {
|
||||
if (nodeInfo.router_events && nodeInfo.router_events.includes(triggerNodeId)) {
|
||||
const routerNode = nodeName;
|
||||
|
||||
const routerEdges = allEdges.filter(
|
||||
@@ -768,7 +768,7 @@ class TriggeredByHighlighter {
|
||||
this.animateEdgeStyles();
|
||||
}
|
||||
|
||||
highlightAllRouterPaths() {
|
||||
highlightAllRouterEvents() {
|
||||
this.clear();
|
||||
|
||||
if (!this.activeDrawerNodeId) {
|
||||
@@ -792,10 +792,10 @@ class TriggeredByHighlighter {
|
||||
routerEdges.forEach(edge => {
|
||||
pathNodes.add(edge.to);
|
||||
});
|
||||
} else if (activeMetadata && activeMetadata.router_paths && activeMetadata.router_paths.length > 0) {
|
||||
activeMetadata.router_paths.forEach(pathName => {
|
||||
} else if (activeMetadata && activeMetadata.router_events && activeMetadata.router_events.length > 0) {
|
||||
activeMetadata.router_events.forEach(eventName => {
|
||||
for (const [nodeName, nodeInfo] of Object.entries(nodeData)) {
|
||||
if (nodeInfo.router_paths && nodeInfo.router_paths.includes(pathName)) {
|
||||
if (nodeInfo.router_events && nodeInfo.router_events.includes(eventName)) {
|
||||
const edgeFromRouter = allEdges.filter(
|
||||
(edge) => edge.from === nodeName && edge.to === this.activeDrawerNodeId && edge.dashes
|
||||
);
|
||||
@@ -892,8 +892,8 @@ class TriggeredByHighlighter {
|
||||
) {
|
||||
for (const [nodeName, nodeInfo] of Object.entries(nodeData)) {
|
||||
if (
|
||||
nodeInfo.router_paths &&
|
||||
nodeInfo.router_paths.includes(triggerNodeId)
|
||||
nodeInfo.router_events &&
|
||||
nodeInfo.router_events.includes(triggerNodeId)
|
||||
) {
|
||||
const routerNode = nodeName;
|
||||
|
||||
@@ -1501,7 +1501,7 @@ class DrawerManager {
|
||||
const activeMetadata = nodeData[activeNodeId];
|
||||
if (activeMetadata && activeMetadata.trigger_methods && activeMetadata.trigger_methods.includes(triggerNodeId)) {
|
||||
for (const [nodeName, nodeInfo] of Object.entries(nodeData)) {
|
||||
if (nodeInfo.router_paths && nodeInfo.router_paths.includes(triggerNodeId)) {
|
||||
if (nodeInfo.router_events && nodeInfo.router_events.includes(triggerNodeId)) {
|
||||
const routerEdges = allEdges.filter(
|
||||
(edge) => edge.from === nodeName && edge.dashes
|
||||
);
|
||||
@@ -1660,16 +1660,16 @@ class DrawerManager {
|
||||
`;
|
||||
}
|
||||
|
||||
if (metadata.router_paths && metadata.router_paths.length > 0) {
|
||||
const uniqueRouterPaths = [...new Set(metadata.router_paths)];
|
||||
const routerPathsJson = JSON.stringify(uniqueRouterPaths).replace(/"/g, '"');
|
||||
if (metadata.router_events && metadata.router_events.length > 0) {
|
||||
const uniqueRouterEvents = [...new Set(metadata.router_events)];
|
||||
const routerEventsJson = JSON.stringify(uniqueRouterEvents).replace(/"/g, '"');
|
||||
metadataContent += `
|
||||
<div class="drawer-section">
|
||||
<div class="drawer-section-title router-paths-title" data-router-paths="${routerPathsJson}" style="cursor: pointer; display: inline-flex; align-items: center; gap: 4px;">
|
||||
Router Paths <i data-lucide="chevron-down" style="width: 14px; height: 14px; color: var(--text-primary);"></i>
|
||||
<div class="drawer-section-title router-events-title" data-router-events="${routerEventsJson}" style="cursor: pointer; display: inline-flex; align-items: center; gap: 4px;">
|
||||
Router Events <i data-lucide="chevron-down" style="width: 14px; height: 14px; color: var(--text-primary);"></i>
|
||||
</div>
|
||||
<ul class="drawer-list">
|
||||
${uniqueRouterPaths.map((p) => `<li><span class="drawer-code-link" data-node-id="${p}" style="color: {{ CREWAI_ORANGE }}; border-color: rgba(255,90,80,0.3);">${p}</span></li>`).join("")}
|
||||
${uniqueRouterEvents.map((eventName) => `<li><span class="drawer-code-link" data-node-id="${eventName}" style="color: {{ CREWAI_ORANGE }}; border-color: rgba(255,90,80,0.3);">${eventName}</span></li>`).join("")}
|
||||
</ul>
|
||||
</div>
|
||||
`;
|
||||
@@ -1823,14 +1823,14 @@ class DrawerManager {
|
||||
});
|
||||
});
|
||||
|
||||
const routerPathsTitle = this.elements.content.querySelector(
|
||||
".router-paths-title[data-router-paths]",
|
||||
const routerEventsTitle = this.elements.content.querySelector(
|
||||
".router-events-title[data-router-events]",
|
||||
);
|
||||
if (routerPathsTitle) {
|
||||
routerPathsTitle.addEventListener("click", (e) => {
|
||||
if (routerEventsTitle) {
|
||||
routerEventsTitle.addEventListener("click", (e) => {
|
||||
e.preventDefault();
|
||||
e.stopPropagation();
|
||||
this.triggeredByHighlighter.highlightAllRouterPaths();
|
||||
this.triggeredByHighlighter.highlightAllRouterEvents();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,131 +1,118 @@
|
||||
"""Flow structure builder for analyzing Flow execution."""
|
||||
"""Flow structure builder for definition-only Flow visualization."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
import inspect
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
|
||||
from crewai.flow.flow_wrappers import FlowCondition
|
||||
from crewai.flow.types import FlowMethodName
|
||||
from crewai.flow.utils import (
|
||||
is_flow_condition_dict,
|
||||
is_simple_flow_condition,
|
||||
from crewai.flow.flow_definition import (
|
||||
FlowDefinition,
|
||||
FlowDefinitionCondition,
|
||||
FlowMethodDefinition,
|
||||
)
|
||||
from crewai.flow.visualization.schema import extract_method_signature
|
||||
from crewai.flow.visualization.types import FlowStructure, NodeMetadata, StructureEdge
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = ["build_flow_structure", "calculate_execution_paths"]
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.flow import Flow
|
||||
|
||||
|
||||
def _definition_condition_items(
|
||||
condition: dict[str, Any],
|
||||
key: str,
|
||||
) -> list[FlowDefinitionCondition]:
|
||||
return cast(list[FlowDefinitionCondition], condition.get(key, []))
|
||||
|
||||
|
||||
def _definition_condition_parts(
|
||||
condition: dict[str, Any],
|
||||
) -> tuple[str, list[FlowDefinitionCondition]]:
|
||||
if "and" in condition:
|
||||
return AND_CONDITION, _definition_condition_items(condition, "and")
|
||||
return OR_CONDITION, _definition_condition_items(condition, "or")
|
||||
|
||||
|
||||
def _condition_type_from_definition(
|
||||
condition: FlowDefinitionCondition | None,
|
||||
) -> str | None:
|
||||
if isinstance(condition, dict):
|
||||
if "and" in condition:
|
||||
return AND_CONDITION
|
||||
if "or" in condition:
|
||||
return OR_CONDITION
|
||||
if isinstance(condition, str):
|
||||
return OR_CONDITION
|
||||
return None
|
||||
|
||||
|
||||
def _runtime_condition_from_definition(
|
||||
condition: FlowDefinitionCondition,
|
||||
) -> str | dict[str, Any]:
|
||||
if isinstance(condition, str):
|
||||
return condition
|
||||
condition_type, conditions = _definition_condition_parts(condition)
|
||||
return {
|
||||
"type": condition_type,
|
||||
"conditions": [_runtime_condition_from_definition(item) for item in conditions],
|
||||
}
|
||||
|
||||
|
||||
def _method_trigger_condition(
|
||||
method_definition: FlowMethodDefinition,
|
||||
) -> FlowDefinitionCondition | None:
|
||||
if method_definition.listen is not None:
|
||||
return method_definition.listen
|
||||
if isinstance(method_definition.start, str | dict):
|
||||
return method_definition.start
|
||||
return None
|
||||
|
||||
|
||||
def _method_router_events(method_definition: FlowMethodDefinition) -> list[str]:
|
||||
if method_definition.human_feedback and method_definition.human_feedback.emit:
|
||||
return [str(event) for event in method_definition.human_feedback.emit]
|
||||
if method_definition.emit:
|
||||
return [str(event) for event in method_definition.emit]
|
||||
return []
|
||||
|
||||
|
||||
def _extract_direct_or_triggers(
|
||||
condition: str | dict[str, Any] | list[Any] | FlowCondition,
|
||||
condition: FlowDefinitionCondition,
|
||||
) -> list[str]:
|
||||
"""Extract direct OR-level trigger strings from a condition.
|
||||
|
||||
This function extracts strings that would directly trigger a listener,
|
||||
meaning they appear at the top level of an OR condition. Strings nested
|
||||
inside AND conditions are NOT considered direct triggers for router paths.
|
||||
|
||||
For example:
|
||||
- or_("a", "b") -> ["a", "b"] (both are direct triggers)
|
||||
- and_("a", "b") -> [] (neither are direct triggers, both required)
|
||||
- or_(and_("a", "b"), "c") -> ["c"] (only "c" is a direct trigger)
|
||||
|
||||
Args:
|
||||
condition: Can be a string, dict, or list.
|
||||
|
||||
Returns:
|
||||
List of direct OR-level trigger strings.
|
||||
"""
|
||||
if isinstance(condition, str):
|
||||
return [condition]
|
||||
if isinstance(condition, dict):
|
||||
cond_type = condition.get("type", OR_CONDITION)
|
||||
conditions_list = condition.get("conditions", [])
|
||||
|
||||
if cond_type == OR_CONDITION:
|
||||
strings = []
|
||||
for sub_cond in conditions_list:
|
||||
strings.extend(_extract_direct_or_triggers(sub_cond))
|
||||
return strings
|
||||
condition_type, conditions = _definition_condition_parts(condition)
|
||||
if condition_type == AND_CONDITION:
|
||||
return []
|
||||
if isinstance(condition, list):
|
||||
strings = []
|
||||
for item in condition:
|
||||
strings.extend(_extract_direct_or_triggers(item))
|
||||
return strings
|
||||
if callable(condition) and hasattr(condition, "__name__"):
|
||||
return [condition.__name__]
|
||||
return []
|
||||
strings: list[str] = []
|
||||
for sub_condition in conditions:
|
||||
strings.extend(_extract_direct_or_triggers(sub_condition))
|
||||
return strings
|
||||
|
||||
|
||||
def _extract_all_trigger_names(
|
||||
condition: str | dict[str, Any] | list[Any] | FlowCondition,
|
||||
condition: FlowDefinitionCondition,
|
||||
) -> list[str]:
|
||||
"""Extract ALL trigger names from a condition for display purposes.
|
||||
|
||||
Unlike _extract_direct_or_triggers, this extracts ALL strings and method
|
||||
names from the entire condition tree, including those nested in AND conditions.
|
||||
This is used for displaying trigger information in the UI.
|
||||
|
||||
For example:
|
||||
- or_("a", "b") -> ["a", "b"]
|
||||
- and_("a", "b") -> ["a", "b"]
|
||||
- or_(and_("a", method_6), method_4) -> ["a", "method_6", "method_4"]
|
||||
|
||||
Args:
|
||||
condition: Can be a string, dict, or list.
|
||||
|
||||
Returns:
|
||||
List of all trigger names found in the condition.
|
||||
"""
|
||||
if isinstance(condition, str):
|
||||
return [condition]
|
||||
if isinstance(condition, dict):
|
||||
conditions_list = condition.get("conditions", [])
|
||||
strings = []
|
||||
for sub_cond in conditions_list:
|
||||
strings.extend(_extract_all_trigger_names(sub_cond))
|
||||
return strings
|
||||
if isinstance(condition, list):
|
||||
strings = []
|
||||
for item in condition:
|
||||
strings.extend(_extract_all_trigger_names(item))
|
||||
return strings
|
||||
if callable(condition) and hasattr(condition, "__name__"):
|
||||
return [condition.__name__]
|
||||
return []
|
||||
_, conditions = _definition_condition_parts(condition)
|
||||
strings: list[str] = []
|
||||
for sub_condition in conditions:
|
||||
strings.extend(_extract_all_trigger_names(sub_condition))
|
||||
return strings
|
||||
|
||||
|
||||
def _create_edges_from_condition(
|
||||
condition: str | dict[str, Any] | list[Any] | FlowCondition,
|
||||
condition: FlowDefinitionCondition,
|
||||
target: str,
|
||||
nodes: dict[str, NodeMetadata],
|
||||
) -> list[StructureEdge]:
|
||||
"""Create edges from a condition tree, preserving AND/OR semantics.
|
||||
|
||||
This function recursively processes the condition tree and creates edges
|
||||
with the appropriate condition_type for each trigger.
|
||||
|
||||
For AND conditions, all triggers get edges with condition_type="AND".
|
||||
For OR conditions, triggers get edges with condition_type="OR".
|
||||
|
||||
Args:
|
||||
condition: The condition tree (string, dict, or list).
|
||||
target: The target node name.
|
||||
nodes: Dictionary of all nodes for validation.
|
||||
|
||||
Returns:
|
||||
List of StructureEdge objects representing the condition.
|
||||
"""
|
||||
edges: list[StructureEdge] = []
|
||||
|
||||
if isinstance(condition, str):
|
||||
@@ -135,24 +122,11 @@ def _create_edges_from_condition(
|
||||
source=condition,
|
||||
target=target,
|
||||
condition_type=OR_CONDITION,
|
||||
is_router_path=False,
|
||||
)
|
||||
)
|
||||
elif callable(condition) and hasattr(condition, "__name__"):
|
||||
method_name = condition.__name__
|
||||
if method_name in nodes:
|
||||
edges.append(
|
||||
StructureEdge(
|
||||
source=method_name,
|
||||
target=target,
|
||||
condition_type=OR_CONDITION,
|
||||
is_router_path=False,
|
||||
is_router_event=False,
|
||||
)
|
||||
)
|
||||
elif isinstance(condition, dict):
|
||||
cond_type = condition.get("type", OR_CONDITION)
|
||||
conditions_list = condition.get("conditions", [])
|
||||
|
||||
cond_type, conditions = _definition_condition_parts(condition)
|
||||
if cond_type == AND_CONDITION:
|
||||
triggers = _extract_all_trigger_names(condition)
|
||||
edges.extend(
|
||||
@@ -160,277 +134,144 @@ def _create_edges_from_condition(
|
||||
source=trigger,
|
||||
target=target,
|
||||
condition_type=AND_CONDITION,
|
||||
is_router_path=False,
|
||||
is_router_event=False,
|
||||
)
|
||||
for trigger in triggers
|
||||
if trigger in nodes
|
||||
)
|
||||
else:
|
||||
for sub_cond in conditions_list:
|
||||
edges.extend(_create_edges_from_condition(sub_cond, target, nodes))
|
||||
elif isinstance(condition, list):
|
||||
for item in condition:
|
||||
edges.extend(_create_edges_from_condition(item, target, nodes))
|
||||
for sub_condition in conditions:
|
||||
edges.extend(_create_edges_from_condition(sub_condition, target, nodes))
|
||||
|
||||
return edges
|
||||
|
||||
|
||||
def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
|
||||
"""Build a structure representation of a Flow's execution.
|
||||
def _flow_definition_from(
|
||||
flow_or_definition: Flow[Any] | type[Flow[Any]] | FlowDefinition,
|
||||
) -> FlowDefinition:
|
||||
if isinstance(flow_or_definition, FlowDefinition):
|
||||
return flow_or_definition
|
||||
|
||||
Args:
|
||||
flow: Flow instance to analyze.
|
||||
flow_class = (
|
||||
flow_or_definition
|
||||
if isinstance(flow_or_definition, type)
|
||||
else type(flow_or_definition)
|
||||
)
|
||||
flow_definition = getattr(flow_class, "flow_definition", None)
|
||||
if callable(flow_definition):
|
||||
return cast(FlowDefinition, flow_definition())
|
||||
raise TypeError(
|
||||
"build_flow_structure requires a FlowDefinition or a Flow class/instance "
|
||||
"with flow_definition()."
|
||||
)
|
||||
|
||||
Returns:
|
||||
Dictionary with nodes, edges, start_methods, and router_methods.
|
||||
"""
|
||||
|
||||
def build_flow_structure(
|
||||
flow_or_definition: Flow[Any] | type[Flow[Any]] | FlowDefinition,
|
||||
) -> FlowStructure:
|
||||
"""Build a visualization structure projection from a FlowDefinition."""
|
||||
definition = _flow_definition_from(flow_or_definition)
|
||||
nodes: dict[str, NodeMetadata] = {}
|
||||
edges: list[StructureEdge] = []
|
||||
start_methods: list[str] = []
|
||||
router_methods: list[str] = []
|
||||
|
||||
for method_name, method in flow._methods.items():
|
||||
node_metadata: NodeMetadata = {"type": "listen"}
|
||||
for method_name, method_definition in definition.methods.items():
|
||||
node_metadata: NodeMetadata = {"type": "listen", "class_name": definition.name}
|
||||
|
||||
if hasattr(method, "__is_start_method__") and method.__is_start_method__:
|
||||
if method_definition.is_start:
|
||||
node_metadata["type"] = "start"
|
||||
start_methods.append(method_name)
|
||||
|
||||
if hasattr(method, "__is_router__") and method.__is_router__:
|
||||
if method_definition.router:
|
||||
node_metadata["is_router"] = True
|
||||
node_metadata["type"] = "router"
|
||||
router_methods.append(method_name)
|
||||
router_events = _method_router_events(method_definition)
|
||||
if router_events:
|
||||
node_metadata["router_events"] = router_events
|
||||
|
||||
if method_name in flow._router_paths:
|
||||
node_metadata["router_paths"] = [
|
||||
str(p) for p in flow._router_paths[method_name]
|
||||
]
|
||||
|
||||
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
|
||||
node_metadata["trigger_methods"] = [
|
||||
str(m) for m in method.__trigger_methods__
|
||||
]
|
||||
|
||||
if hasattr(method, "__condition_type__") and method.__condition_type__:
|
||||
node_metadata["trigger_condition_type"] = method.__condition_type__
|
||||
if "condition_type" not in node_metadata:
|
||||
node_metadata["condition_type"] = method.__condition_type__
|
||||
trigger_condition = _method_trigger_condition(method_definition)
|
||||
condition_type = _condition_type_from_definition(trigger_condition)
|
||||
if condition_type is not None and trigger_condition is not None:
|
||||
node_metadata["trigger_condition_type"] = condition_type
|
||||
node_metadata["condition_type"] = condition_type
|
||||
extracted = _extract_all_trigger_names(trigger_condition)
|
||||
if extracted:
|
||||
node_metadata["trigger_methods"] = extracted
|
||||
runtime_condition = _runtime_condition_from_definition(trigger_condition)
|
||||
if isinstance(runtime_condition, dict):
|
||||
node_metadata["trigger_condition"] = runtime_condition
|
||||
|
||||
if node_metadata.get("is_router") and "condition_type" not in node_metadata:
|
||||
node_metadata["condition_type"] = "IF"
|
||||
|
||||
if (
|
||||
hasattr(method, "__trigger_condition__")
|
||||
and method.__trigger_condition__ is not None
|
||||
):
|
||||
node_metadata["trigger_condition"] = method.__trigger_condition__
|
||||
|
||||
if "trigger_methods" not in node_metadata:
|
||||
extracted = _extract_all_trigger_names(method.__trigger_condition__)
|
||||
if extracted:
|
||||
node_metadata["trigger_methods"] = extracted
|
||||
|
||||
node_metadata["method_signature"] = extract_method_signature(
|
||||
method, method_name
|
||||
)
|
||||
|
||||
try:
|
||||
source_code = inspect.getsource(method)
|
||||
node_metadata["source_code"] = source_code
|
||||
|
||||
try:
|
||||
source_lines, start_line = inspect.getsourcelines(method)
|
||||
node_metadata["source_lines"] = source_lines
|
||||
node_metadata["source_start_line"] = start_line
|
||||
except (OSError, TypeError):
|
||||
pass
|
||||
|
||||
try:
|
||||
source_file = inspect.getsourcefile(method)
|
||||
if source_file:
|
||||
node_metadata["source_file"] = source_file
|
||||
except (OSError, TypeError):
|
||||
try:
|
||||
class_file = inspect.getsourcefile(flow.__class__)
|
||||
if class_file:
|
||||
node_metadata["source_file"] = class_file
|
||||
except (OSError, TypeError):
|
||||
pass
|
||||
except (OSError, TypeError):
|
||||
pass
|
||||
|
||||
try:
|
||||
class_obj = flow.__class__
|
||||
|
||||
if class_obj:
|
||||
class_name = class_obj.__name__
|
||||
|
||||
bases = class_obj.__bases__
|
||||
if bases:
|
||||
base_strs = []
|
||||
for base in bases:
|
||||
if hasattr(base, "__name__"):
|
||||
if hasattr(base, "__origin__"):
|
||||
base_strs.append(str(base))
|
||||
else:
|
||||
base_strs.append(base.__name__)
|
||||
else:
|
||||
base_strs.append(str(base))
|
||||
|
||||
try:
|
||||
source_lines = inspect.getsource(class_obj).split("\n")
|
||||
_, class_start_line = inspect.getsourcelines(class_obj)
|
||||
|
||||
for idx, line in enumerate(source_lines):
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("class ") and class_name in stripped:
|
||||
class_signature = stripped.rstrip(":")
|
||||
node_metadata["class_signature"] = class_signature
|
||||
node_metadata["class_line_number"] = (
|
||||
class_start_line + idx
|
||||
)
|
||||
break
|
||||
except (OSError, TypeError):
|
||||
class_signature = f"class {class_name}({', '.join(base_strs)})"
|
||||
node_metadata["class_signature"] = class_signature
|
||||
else:
|
||||
class_signature = f"class {class_name}"
|
||||
node_metadata["class_signature"] = class_signature
|
||||
|
||||
node_metadata["class_name"] = class_name
|
||||
except (OSError, TypeError, AttributeError):
|
||||
pass
|
||||
|
||||
nodes[method_name] = node_metadata
|
||||
|
||||
for listener_name, condition_data in flow._listeners.items():
|
||||
if listener_name in router_methods:
|
||||
for method_name, method_definition in definition.methods.items():
|
||||
trigger_condition = _method_trigger_condition(method_definition)
|
||||
if trigger_condition is None:
|
||||
continue
|
||||
|
||||
if is_simple_flow_condition(condition_data):
|
||||
cond_type, methods = condition_data
|
||||
edges.extend(
|
||||
StructureEdge(
|
||||
source=str(trigger_method),
|
||||
target=str(listener_name),
|
||||
condition_type=cond_type,
|
||||
is_router_path=False,
|
||||
)
|
||||
for trigger_method in methods
|
||||
if str(trigger_method) in nodes
|
||||
)
|
||||
elif is_flow_condition_dict(condition_data):
|
||||
edges.extend(
|
||||
_create_edges_from_condition(condition_data, str(listener_name), nodes)
|
||||
)
|
||||
|
||||
for method_name, node_metadata in nodes.items(): # type: ignore[assignment]
|
||||
if node_metadata.get("is_router") and "trigger_methods" in node_metadata:
|
||||
trigger_methods = node_metadata["trigger_methods"]
|
||||
condition_type = node_metadata.get("trigger_condition_type", OR_CONDITION)
|
||||
|
||||
if "trigger_condition" in node_metadata:
|
||||
edges.extend(
|
||||
_create_edges_from_condition(
|
||||
node_metadata["trigger_condition"], # type: ignore[arg-type]
|
||||
method_name,
|
||||
nodes,
|
||||
)
|
||||
)
|
||||
else:
|
||||
edges.extend(
|
||||
StructureEdge(
|
||||
source=trigger_method,
|
||||
target=method_name,
|
||||
condition_type=condition_type,
|
||||
is_router_path=False,
|
||||
)
|
||||
for trigger_method in trigger_methods
|
||||
if trigger_method in nodes
|
||||
)
|
||||
edges.extend(
|
||||
_create_edges_from_condition(trigger_condition, method_name, nodes)
|
||||
)
|
||||
|
||||
all_string_triggers: set[str] = set()
|
||||
for condition_data in flow._listeners.values():
|
||||
if is_simple_flow_condition(condition_data):
|
||||
_, methods = condition_data
|
||||
for m in methods:
|
||||
if str(m) not in nodes: # It's a string trigger, not a method name
|
||||
all_string_triggers.add(str(m))
|
||||
elif is_flow_condition_dict(condition_data):
|
||||
for trigger in _extract_direct_or_triggers(condition_data):
|
||||
if trigger not in nodes:
|
||||
all_string_triggers.add(trigger)
|
||||
for method_definition in definition.methods.values():
|
||||
trigger_condition = _method_trigger_condition(method_definition)
|
||||
if trigger_condition is None:
|
||||
continue
|
||||
for trigger in _extract_direct_or_triggers(trigger_condition):
|
||||
if trigger not in nodes:
|
||||
all_string_triggers.add(trigger)
|
||||
|
||||
all_router_outputs: set[str] = set()
|
||||
all_router_events: set[str] = set()
|
||||
for router_method_name in router_methods:
|
||||
if router_method_name not in flow._router_paths:
|
||||
flow._router_paths[FlowMethodName(router_method_name)] = []
|
||||
router_events = _method_router_events(definition.methods[router_method_name])
|
||||
if router_events and router_method_name in nodes:
|
||||
nodes[router_method_name]["router_events"] = router_events
|
||||
all_router_events.update(router_events)
|
||||
|
||||
current_paths = flow._router_paths.get(FlowMethodName(router_method_name), [])
|
||||
if current_paths and router_method_name in nodes:
|
||||
nodes[router_method_name]["router_paths"] = [str(p) for p in current_paths]
|
||||
all_router_outputs.update(str(p) for p in current_paths)
|
||||
|
||||
if not current_paths:
|
||||
if not router_events:
|
||||
logger.warning(
|
||||
f"Could not determine return paths for router '{router_method_name}'. "
|
||||
f"Add a return type annotation like "
|
||||
f"'-> Literal[\"path1\", \"path2\"]' or '-> YourEnum' "
|
||||
f"to enable proper flow visualization."
|
||||
f"Router events for '{router_method_name}' are dynamic or not "
|
||||
f"statically inferable; static visualization may omit event edges."
|
||||
)
|
||||
|
||||
orphaned_triggers = all_string_triggers - all_router_outputs
|
||||
orphaned_triggers = all_string_triggers - all_router_events
|
||||
if orphaned_triggers:
|
||||
logger.error(
|
||||
f"Found listeners waiting for triggers {orphaned_triggers} "
|
||||
f"but no router outputs these values explicitly. "
|
||||
f"If your router returns a non-static value, check that your router has proper return type annotations."
|
||||
logger.warning(
|
||||
f"Static visualization could not match listener triggers "
|
||||
f"{orphaned_triggers} to explicit router events. "
|
||||
f"Dynamic router values may still trigger these listeners at runtime."
|
||||
)
|
||||
|
||||
for router_method_name in router_methods:
|
||||
if router_method_name not in flow._router_paths:
|
||||
continue
|
||||
router_events = _method_router_events(definition.methods[router_method_name])
|
||||
|
||||
router_paths = flow._router_paths[FlowMethodName(router_method_name)]
|
||||
|
||||
for path in router_paths:
|
||||
for listener_name, condition_data in flow._listeners.items():
|
||||
for event in router_events:
|
||||
for listener_name, method_definition in definition.methods.items():
|
||||
if listener_name == router_method_name:
|
||||
continue
|
||||
|
||||
trigger_strings_from_cond: list[str] = []
|
||||
trigger_condition = _method_trigger_condition(method_definition)
|
||||
if trigger_condition is None:
|
||||
continue
|
||||
trigger_strings_from_cond = _extract_direct_or_triggers(
|
||||
trigger_condition
|
||||
)
|
||||
|
||||
if is_simple_flow_condition(condition_data):
|
||||
_, methods = condition_data
|
||||
trigger_strings_from_cond = [str(m) for m in methods]
|
||||
elif is_flow_condition_dict(condition_data):
|
||||
trigger_strings_from_cond = _extract_direct_or_triggers(
|
||||
condition_data
|
||||
)
|
||||
|
||||
if str(path) in trigger_strings_from_cond:
|
||||
if str(event) in trigger_strings_from_cond:
|
||||
edges.append(
|
||||
StructureEdge(
|
||||
source=router_method_name,
|
||||
target=str(listener_name),
|
||||
target=listener_name,
|
||||
condition_type=None,
|
||||
is_router_path=True,
|
||||
router_path_label=str(path),
|
||||
is_router_event=True,
|
||||
router_event=str(event),
|
||||
)
|
||||
)
|
||||
|
||||
for start_method in flow._start_methods:
|
||||
if start_method not in nodes and start_method in flow._methods:
|
||||
method = flow._methods[start_method]
|
||||
nodes[str(start_method)] = NodeMetadata(type="start")
|
||||
|
||||
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
|
||||
nodes[str(start_method)]["trigger_methods"] = [
|
||||
str(m) for m in method.__trigger_methods__
|
||||
]
|
||||
if hasattr(method, "__condition_type__") and method.__condition_type__:
|
||||
nodes[str(start_method)]["condition_type"] = method.__condition_type__
|
||||
|
||||
return FlowStructure(
|
||||
nodes=nodes,
|
||||
edges=edges,
|
||||
@@ -453,7 +294,7 @@ def calculate_execution_paths(structure: FlowStructure) -> int:
|
||||
graph[edge["source"]].append(
|
||||
{
|
||||
"target": edge["target"],
|
||||
"is_router": edge["is_router_path"],
|
||||
"is_router": edge["is_router_event"],
|
||||
"condition": edge["condition_type"],
|
||||
}
|
||||
)
|
||||
@@ -466,15 +307,6 @@ def calculate_execution_paths(structure: FlowStructure) -> int:
|
||||
return 0
|
||||
|
||||
def count_paths_from(node: str, visited: set[str]) -> int:
|
||||
"""Recursively count execution paths from a given node.
|
||||
|
||||
Args:
|
||||
node: Node name to start counting from.
|
||||
visited: Set of already visited nodes to prevent cycles.
|
||||
|
||||
Returns:
|
||||
Number of execution paths from this node to terminal nodes.
|
||||
"""
|
||||
if node in terminal_nodes:
|
||||
return 1
|
||||
|
||||
|
||||
@@ -309,18 +309,18 @@ def render_interactive(
|
||||
</div>
|
||||
""")
|
||||
|
||||
if metadata.get("router_paths"):
|
||||
paths = metadata["router_paths"]
|
||||
paths_items = "".join(
|
||||
if metadata.get("router_events"):
|
||||
router_events = metadata["router_events"]
|
||||
event_items = "".join(
|
||||
[
|
||||
f'<li style="margin: 3px 0;"><code style="background: rgba(255,90,80,0.08); padding: 2px 6px; border-radius: 3px; font-size: 10px; color: {CREWAI_ORANGE}; border: 1px solid rgba(255,90,80,0.2); font-weight: 600;">{p}</code></li>'
|
||||
for p in paths
|
||||
for p in router_events
|
||||
]
|
||||
)
|
||||
title_parts.append(f"""
|
||||
<div>
|
||||
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">Router Paths</div>
|
||||
<ul style="list-style: none; padding: 0; margin: 0;">{paths_items}</ul>
|
||||
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">Router Events</div>
|
||||
<ul style="list-style: none; padding: 0; margin: 0;">{event_items}</ul>
|
||||
</div>
|
||||
""")
|
||||
|
||||
@@ -364,11 +364,11 @@ def render_interactive(
|
||||
edge_color: str = GRAY
|
||||
edge_dashes: bool | list[int] = False
|
||||
|
||||
if edge["is_router_path"]:
|
||||
if edge["is_router_event"]:
|
||||
edge_color = CREWAI_ORANGE
|
||||
edge_dashes = [15, 10]
|
||||
if "router_path_label" in edge:
|
||||
edge_label = edge["router_path_label"]
|
||||
if "router_event" in edge:
|
||||
edge_label = edge["router_event"]
|
||||
elif edge["condition_type"] == "AND":
|
||||
edge_label = "AND"
|
||||
edge_color = CREWAI_ORANGE
|
||||
|
||||
@@ -1,104 +0,0 @@
|
||||
"""OpenAPI schema conversion utilities for Flow methods."""
|
||||
|
||||
import inspect
|
||||
from typing import Any, get_args, get_origin
|
||||
|
||||
|
||||
def type_to_openapi_schema(type_hint: Any) -> dict[str, Any]:
|
||||
"""Convert Python type hint to OpenAPI schema.
|
||||
|
||||
Args:
|
||||
type_hint: Python type hint to convert.
|
||||
|
||||
Returns:
|
||||
OpenAPI schema dictionary.
|
||||
"""
|
||||
if type_hint is inspect.Parameter.empty:
|
||||
return {}
|
||||
|
||||
if type_hint is None or type_hint is type(None):
|
||||
return {"type": "null"}
|
||||
|
||||
if hasattr(type_hint, "__module__") and hasattr(type_hint, "__name__"):
|
||||
if type_hint.__module__ == "typing" and type_hint.__name__ == "Any":
|
||||
return {}
|
||||
|
||||
type_str = str(type_hint)
|
||||
if type_str == "typing.Any" or type_str == "<class 'typing.Any'>":
|
||||
return {}
|
||||
|
||||
if isinstance(type_hint, str):
|
||||
return {"type": type_hint}
|
||||
|
||||
origin = get_origin(type_hint)
|
||||
args = get_args(type_hint)
|
||||
|
||||
if type_hint is str:
|
||||
return {"type": "string"}
|
||||
if type_hint is int:
|
||||
return {"type": "integer"}
|
||||
if type_hint is float:
|
||||
return {"type": "number"}
|
||||
if type_hint is bool:
|
||||
return {"type": "boolean"}
|
||||
if type_hint is dict or origin is dict:
|
||||
if args and len(args) > 1:
|
||||
return {
|
||||
"type": "object",
|
||||
"additionalProperties": type_to_openapi_schema(args[1]),
|
||||
}
|
||||
return {"type": "object"}
|
||||
if type_hint is list or origin is list:
|
||||
if args:
|
||||
return {"type": "array", "items": type_to_openapi_schema(args[0])}
|
||||
return {"type": "array"}
|
||||
if hasattr(type_hint, "__name__"):
|
||||
return {"type": "object", "className": type_hint.__name__}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def extract_method_signature(method: Any, method_name: str) -> dict[str, Any]:
|
||||
"""Extract method signature as OpenAPI schema with documentation.
|
||||
|
||||
Args:
|
||||
method: Method to analyze.
|
||||
method_name: Method name.
|
||||
|
||||
Returns:
|
||||
Dictionary with operationId, parameters, returns, summary, and description.
|
||||
"""
|
||||
try:
|
||||
sig = inspect.signature(method)
|
||||
|
||||
parameters = {}
|
||||
for param_name, param in sig.parameters.items():
|
||||
if param_name == "self":
|
||||
continue
|
||||
parameters[param_name] = type_to_openapi_schema(param.annotation)
|
||||
|
||||
return_type = type_to_openapi_schema(sig.return_annotation)
|
||||
|
||||
docstring = inspect.getdoc(method)
|
||||
|
||||
result: dict[str, Any] = {
|
||||
"operationId": method_name,
|
||||
"parameters": parameters,
|
||||
"returns": return_type,
|
||||
}
|
||||
|
||||
if docstring:
|
||||
lines = docstring.strip().split("\n")
|
||||
summary = lines[0].strip()
|
||||
|
||||
if summary:
|
||||
result["summary"] = summary
|
||||
|
||||
if len(lines) > 1:
|
||||
description = "\n".join(line.strip() for line in lines[1:]).strip()
|
||||
if description:
|
||||
result["description"] = description
|
||||
|
||||
return result
|
||||
except Exception:
|
||||
return {"operationId": method_name, "parameters": {}, "returns": {}}
|
||||
@@ -3,24 +3,20 @@
|
||||
from typing import Any, TypedDict
|
||||
|
||||
|
||||
__all__ = ["FlowStructure", "NodeMetadata", "StructureEdge"]
|
||||
|
||||
|
||||
class NodeMetadata(TypedDict, total=False):
|
||||
"""Metadata for a single node in the flow structure."""
|
||||
|
||||
type: str
|
||||
is_router: bool
|
||||
router_paths: list[str]
|
||||
router_events: list[str]
|
||||
condition_type: str | None
|
||||
trigger_condition_type: str | None
|
||||
trigger_methods: list[str]
|
||||
trigger_condition: dict[str, Any] | None
|
||||
method_signature: dict[str, Any]
|
||||
source_code: str
|
||||
source_lines: list[str]
|
||||
source_start_line: int
|
||||
source_file: str
|
||||
class_signature: str
|
||||
class_name: str
|
||||
class_line_number: int
|
||||
|
||||
|
||||
class StructureEdge(TypedDict, total=False):
|
||||
@@ -29,8 +25,8 @@ class StructureEdge(TypedDict, total=False):
|
||||
source: str
|
||||
target: str
|
||||
condition_type: str | None
|
||||
is_router_path: bool
|
||||
router_path_label: str
|
||||
is_router_event: bool
|
||||
router_event: str
|
||||
|
||||
|
||||
class FlowStructure(TypedDict):
|
||||
|
||||
@@ -1012,7 +1012,7 @@ class TestLLMObjectPreservedInContext:
|
||||
call_kwargs = mock_collapse.call_args
|
||||
assert call_kwargs.kwargs["feedback"] == "this looks good, proceed!"
|
||||
assert call_kwargs.kwargs["outcomes"] == ["needs_changes", "approved"]
|
||||
# LLM should be a live object (from _hf_llm) or reconstructed, not None
|
||||
# LLM should be a live object (from _human_feedback_llm) or reconstructed, not None
|
||||
assert call_kwargs.kwargs["llm"] is not None
|
||||
assert getattr(call_kwargs.kwargs["llm"], "model", None) == "gemini-2.0-flash"
|
||||
assert flow2.last_human_feedback.outcome == "approved"
|
||||
@@ -1171,8 +1171,8 @@ class TestAsyncHumanFeedbackEdgeCases:
|
||||
class TestLiveLLMPreservationOnResume:
|
||||
"""Tests for preserving the full LLM config across HITL resume."""
|
||||
|
||||
def test_hf_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
|
||||
"""Test that _hf_llm is set on the wrapper when llm is a BaseLLM instance."""
|
||||
def test_human_feedback_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on the wrapper when llm is a BaseLLM instance."""
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
@@ -1191,11 +1191,11 @@ class TestLiveLLMPreservationOnResume:
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm is mock_llm
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is mock_llm
|
||||
|
||||
def test_hf_llm_attribute_set_on_wrapper_with_string(self) -> None:
|
||||
"""Test that _hf_llm is set on the wrapper even when llm is a string."""
|
||||
def test_human_feedback_llm_attribute_set_on_wrapper_with_string(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on the wrapper even when llm is a string."""
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@@ -1210,8 +1210,8 @@ class TestLiveLLMPreservationOnResume:
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm == "gpt-4o-mini"
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_live_basellm_over_serialized_string(
|
||||
@@ -1277,20 +1277,20 @@ class TestLiveLLMPreservationOnResume:
|
||||
flow.resume("looks good!")
|
||||
|
||||
# NOT the serialized string. The live_llm was captured at class definition
|
||||
# time and stored on the method wrapper as _hf_llm.
|
||||
# time and stored on the method wrapper as _human_feedback_llm.
|
||||
assert len(captured_llm) == 1
|
||||
# (which is stored on the method's _hf_llm attribute)
|
||||
# (which is stored on the method's _human_feedback_llm attribute)
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert captured_llm[0] is method._hf_llm
|
||||
assert captured_llm[0] is method._human_feedback_llm
|
||||
# And verify it's a BaseLLM instance, not a string
|
||||
assert isinstance(captured_llm[0], BaseLLM)
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_falls_back_to_serialized_string_when_no_hf_llm(
|
||||
def test_resume_async_falls_back_to_serialized_string_when_no_human_feedback_llm(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async falls back to context.llm when _hf_llm is not available.
|
||||
"""Test that resume_async falls back to context.llm when _human_feedback_llm is not available.
|
||||
|
||||
This ensures backward compatibility with flows that were paused before this fix.
|
||||
"""
|
||||
@@ -1325,10 +1325,10 @@ class TestLiveLLMPreservationOnResume:
|
||||
|
||||
flow = TestFlow.from_pending("fallback-test", persistence)
|
||||
|
||||
# Remove _hf_llm to simulate old decorator without this attribute
|
||||
# Remove _human_feedback_llm to simulate old decorator without this attribute
|
||||
method = flow._methods.get("review")
|
||||
if hasattr(method, "_hf_llm"):
|
||||
delattr(method, "_hf_llm")
|
||||
if hasattr(method, "_human_feedback_llm"):
|
||||
delattr(method, "_human_feedback_llm")
|
||||
|
||||
captured_llm = []
|
||||
|
||||
@@ -1345,10 +1345,10 @@ class TestLiveLLMPreservationOnResume:
|
||||
assert captured_llm[0].model == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_string_from_context_when_hf_llm_is_string(
|
||||
def test_resume_async_uses_string_from_context_when_human_feedback_llm_is_string(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that when _hf_llm is a string (not BaseLLM), we still use context.llm.
|
||||
"""Test that when _human_feedback_llm is a string (not BaseLLM), we still use context.llm.
|
||||
|
||||
String LLM values offer no benefit over the serialized context.llm,
|
||||
so we don't prefer them.
|
||||
@@ -1385,7 +1385,7 @@ class TestLiveLLMPreservationOnResume:
|
||||
flow = TestFlow.from_pending("string-llm-test", persistence)
|
||||
|
||||
method = flow._methods.get("review")
|
||||
assert method._hf_llm == "gpt-4o-mini"
|
||||
assert method._human_feedback_llm == "gpt-4o-mini"
|
||||
|
||||
captured_llm = []
|
||||
|
||||
@@ -1396,14 +1396,14 @@ class TestLiveLLMPreservationOnResume:
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# _hf_llm is a string, so resume deserializes context.llm into an LLM instance
|
||||
# _human_feedback_llm is a string, so resume deserializes context.llm into an LLM instance
|
||||
assert len(captured_llm) == 1
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
assert isinstance(captured_llm[0], BaseLLMClass)
|
||||
assert captured_llm[0].model == "gpt-4o-mini"
|
||||
|
||||
def test_hf_llm_set_for_async_wrapper(self) -> None:
|
||||
"""Test that _hf_llm is set on async wrapper functions."""
|
||||
def test_human_feedback_llm_set_for_async_wrapper(self) -> None:
|
||||
"""Test that _human_feedback_llm is set on async wrapper functions."""
|
||||
import asyncio
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
@@ -1423,5 +1423,5 @@ class TestLiveLLMPreservationOnResume:
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("async_review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm is mock_llm
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is mock_llm
|
||||
|
||||
@@ -1160,9 +1160,9 @@ def test_router_cascade_chain():
|
||||
@router(process_level_1)
|
||||
def router_level_2(self):
|
||||
execution_order.append("router_level_2")
|
||||
return "level_2_path"
|
||||
return "level_2_event"
|
||||
|
||||
@listen("level_2_path")
|
||||
@listen("level_2_event")
|
||||
def process_level_2(self):
|
||||
execution_order.append("process_level_2")
|
||||
self.state["level"] = 3
|
||||
@@ -1171,9 +1171,9 @@ def test_router_cascade_chain():
|
||||
@router(process_level_2)
|
||||
def router_level_3(self):
|
||||
execution_order.append("router_level_3")
|
||||
return "final_path"
|
||||
return "final_event"
|
||||
|
||||
@listen("final_path")
|
||||
@listen("final_event")
|
||||
def finalize(self):
|
||||
execution_order.append("finalize")
|
||||
return "complete"
|
||||
@@ -1261,14 +1261,14 @@ def test_complex_and_or_branching():
|
||||
assert execution_order.index("final") > execution_order.index("branch_2b")
|
||||
|
||||
|
||||
def test_conditional_router_paths_exclusivity():
|
||||
"""Test that only the returned router path activates, not all paths."""
|
||||
def test_conditional_router_events_exclusivity():
|
||||
"""Test that only the returned router event activates, not all events."""
|
||||
execution_order = []
|
||||
|
||||
class ConditionalRouterFlow(Flow):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.state["condition"] = "take_path_b"
|
||||
self.state["condition"] = "take_event_b"
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
@@ -1277,33 +1277,33 @@ def test_conditional_router_paths_exclusivity():
|
||||
@router(begin)
|
||||
def decision_point(self):
|
||||
execution_order.append("decision_point")
|
||||
if self.state["condition"] == "take_path_a":
|
||||
return "path_a"
|
||||
elif self.state["condition"] == "take_path_b":
|
||||
return "path_b"
|
||||
if self.state["condition"] == "take_event_a":
|
||||
return "event_a"
|
||||
elif self.state["condition"] == "take_event_b":
|
||||
return "event_b"
|
||||
else:
|
||||
return "path_c"
|
||||
return "event_c"
|
||||
|
||||
@listen("path_a")
|
||||
def handle_path_a(self):
|
||||
execution_order.append("handle_path_a")
|
||||
@listen("event_a")
|
||||
def handle_event_a(self):
|
||||
execution_order.append("handle_event_a")
|
||||
|
||||
@listen("path_b")
|
||||
def handle_path_b(self):
|
||||
execution_order.append("handle_path_b")
|
||||
@listen("event_b")
|
||||
def handle_event_b(self):
|
||||
execution_order.append("handle_event_b")
|
||||
|
||||
@listen("path_c")
|
||||
def handle_path_c(self):
|
||||
execution_order.append("handle_path_c")
|
||||
@listen("event_c")
|
||||
def handle_event_c(self):
|
||||
execution_order.append("handle_event_c")
|
||||
|
||||
flow = ConditionalRouterFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert "begin" in execution_order
|
||||
assert "decision_point" in execution_order
|
||||
assert "handle_path_b" in execution_order
|
||||
assert "handle_path_a" not in execution_order
|
||||
assert "handle_path_c" not in execution_order
|
||||
assert "handle_event_b" in execution_order
|
||||
assert "handle_event_a" not in execution_order
|
||||
assert "handle_event_c" not in execution_order
|
||||
|
||||
|
||||
def test_state_consistency_across_parallel_branches():
|
||||
|
||||
743
lib/crewai/tests/test_flow_definition.py
Normal file
743
lib/crewai/tests/test_flow_definition.py
Normal file
@@ -0,0 +1,743 @@
|
||||
"""Tests for the static Flow Definition contract."""
|
||||
|
||||
import ast
|
||||
from enum import Enum
|
||||
import importlib
|
||||
import inspect
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
import crewai.flow.dsl as flow_dsl
|
||||
import crewai.flow.flow_definition as flow_definition
|
||||
import crewai.flow.visualization.builder as visualization_builder
|
||||
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
|
||||
|
||||
|
||||
def test_flow_public_exports_are_explicit():
|
||||
import crewai.flow.dsl as flow_dsl
|
||||
import crewai.flow.visualization as flow_visualization
|
||||
|
||||
flow_package = importlib.import_module("crewai.flow")
|
||||
|
||||
assert "FlowDefinition" not in flow_package.__all__
|
||||
assert "FlowDefinitionDiagnostic" not in flow_package.__all__
|
||||
assert "build_flow_definition" not in flow_package.__all__
|
||||
assert "flow_structure" not in flow_package.__all__
|
||||
assert set(flow_dsl.__all__) == {"and_", "listen", "or_", "router", "start"}
|
||||
assert set(flow_definition.__all__) == {
|
||||
"FlowConfigDefinition",
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
"FlowStateDefinition",
|
||||
}
|
||||
assert "build_flow_structure" in flow_visualization.__all__
|
||||
assert "calculate_node_levels" not in flow_visualization.__all__
|
||||
|
||||
|
||||
def test_private_flow_helpers_do_not_have_docstrings():
|
||||
import crewai.flow.dsl as flow_dsl
|
||||
import crewai.flow.flow_wrappers as flow_wrappers
|
||||
import crewai.flow.human_feedback as human_feedback
|
||||
import crewai.flow.persistence.decorators as persistence_decorators
|
||||
import crewai.flow.visualization.types as visualization_types
|
||||
|
||||
modules = [
|
||||
flow_dsl,
|
||||
flow_definition,
|
||||
flow_wrappers,
|
||||
human_feedback,
|
||||
persistence_decorators,
|
||||
visualization_builder,
|
||||
visualization_types,
|
||||
]
|
||||
violations: list[str] = []
|
||||
|
||||
for module in modules:
|
||||
source_path = Path(inspect.getsourcefile(module) or "")
|
||||
tree = ast.parse(source_path.read_text())
|
||||
stack: list[ast.AST] = []
|
||||
if getattr(module, "__all__", None) == [] and ast.get_docstring(tree):
|
||||
violations.append(f"{source_path}:1:<module>")
|
||||
|
||||
class PrivateDocstringVisitor(ast.NodeVisitor):
|
||||
def visit_ClassDef(self, node: ast.ClassDef) -> None:
|
||||
self._check_docstring(node)
|
||||
stack.append(node)
|
||||
self.generic_visit(node)
|
||||
stack.pop()
|
||||
|
||||
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
|
||||
self._check_docstring(node)
|
||||
stack.append(node)
|
||||
self.generic_visit(node)
|
||||
stack.pop()
|
||||
|
||||
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
|
||||
self._check_docstring(node)
|
||||
stack.append(node)
|
||||
self.generic_visit(node)
|
||||
stack.pop()
|
||||
|
||||
def _check_docstring(
|
||||
self,
|
||||
node: ast.ClassDef | ast.FunctionDef | ast.AsyncFunctionDef,
|
||||
) -> None:
|
||||
is_dunder = node.name.startswith("__") and node.name.endswith("__")
|
||||
is_private_name = node.name.startswith("_") and not is_dunder
|
||||
is_nested_function = any(
|
||||
isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef))
|
||||
for parent in stack
|
||||
)
|
||||
if (is_private_name or is_nested_function) and ast.get_docstring(node):
|
||||
violations.append(f"{source_path}:{node.lineno}:{node.name}")
|
||||
|
||||
PrivateDocstringVisitor().visit(tree)
|
||||
|
||||
assert violations == []
|
||||
|
||||
|
||||
def test_flow_definition_contract_is_dsl_agnostic():
|
||||
source_path = Path(inspect.getsourcefile(flow_definition) or "")
|
||||
source = source_path.read_text()
|
||||
|
||||
assert "DSL" not in source
|
||||
assert "flow_wrappers" not in source
|
||||
assert "build_flow_definition" not in source
|
||||
assert "extract_flow_definition" not in source
|
||||
|
||||
|
||||
def test_flow_definition_maps_dsl_to_static_contract():
|
||||
class ContractState(BaseModel):
|
||||
topic: str = ""
|
||||
|
||||
class ContractFlow(Flow[ContractState]):
|
||||
"""A flow with every core DSL role."""
|
||||
|
||||
initial_state = ContractState
|
||||
stream = True
|
||||
max_method_calls = 7
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@listen(begin)
|
||||
def process(self):
|
||||
return "processed"
|
||||
|
||||
@router(process)
|
||||
def decide(self):
|
||||
return "approved"
|
||||
|
||||
@listen(or_("approved", "revise"))
|
||||
@human_feedback(
|
||||
message="Review this output.",
|
||||
emit=["done", "revise"],
|
||||
llm="gpt-4o-mini",
|
||||
default_outcome="done",
|
||||
metadata={"team": "qa"},
|
||||
learn=True,
|
||||
learn_source="hitl",
|
||||
learn_strict=True,
|
||||
)
|
||||
def review(self):
|
||||
return "review"
|
||||
|
||||
@listen(and_(begin, process))
|
||||
def audit(self):
|
||||
return "audit"
|
||||
|
||||
definition = ContractFlow.flow_definition()
|
||||
|
||||
assert definition.schema_ == "crewai.flow/v1"
|
||||
assert definition.name == "ContractFlow"
|
||||
assert definition.description == "A flow with every core DSL role."
|
||||
assert definition.state is not None
|
||||
assert definition.state.type == "pydantic"
|
||||
assert definition.state.ref and "ContractState" in definition.state.ref
|
||||
assert definition.config.stream is True
|
||||
assert definition.config.max_method_calls == 7
|
||||
|
||||
assert definition.methods["begin"].start is True
|
||||
assert definition.methods["process"].listen == "begin"
|
||||
|
||||
decide = definition.methods["decide"]
|
||||
assert decide.listen == "process"
|
||||
assert decide.router is True
|
||||
assert decide.emit is None
|
||||
|
||||
review = definition.methods["review"]
|
||||
assert review.listen == {"or": ["approved", "revise"]}
|
||||
assert review.router is True
|
||||
assert review.emit is None
|
||||
assert review.human_feedback is not None
|
||||
assert review.human_feedback.emit == ["done", "revise"]
|
||||
assert review.human_feedback.default_outcome == "done"
|
||||
assert review.human_feedback.metadata == {"team": "qa"}
|
||||
assert review.human_feedback.learn is True
|
||||
assert review.human_feedback.learn_strict is True
|
||||
|
||||
assert definition.methods["audit"].listen == {"and": ["begin", "process"]}
|
||||
assert definition.diagnostics == []
|
||||
|
||||
|
||||
def test_flow_definition_fragments_cover_start_listen_and_condition_sugar():
|
||||
class FragmentFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "begin"
|
||||
|
||||
@start("restart_event")
|
||||
def restart(self):
|
||||
return "restart"
|
||||
|
||||
@listen(begin)
|
||||
def by_callable(self):
|
||||
return "callable"
|
||||
|
||||
@listen("manual_event")
|
||||
def by_string(self):
|
||||
return "string"
|
||||
|
||||
@listen(and_(begin, by_callable))
|
||||
def by_and(self):
|
||||
return "and"
|
||||
|
||||
@listen(or_(and_("manual_event", by_string), "fallback_event"))
|
||||
def nested(self):
|
||||
return "nested"
|
||||
|
||||
definition = FragmentFlow.flow_definition()
|
||||
|
||||
assert definition.methods["begin"].start is True
|
||||
assert definition.methods["restart"].start == "restart_event"
|
||||
assert definition.methods["by_callable"].listen == "begin"
|
||||
assert definition.methods["by_string"].listen == "manual_event"
|
||||
assert definition.methods["by_and"].listen == {"and": ["begin", "by_callable"]}
|
||||
assert definition.methods["nested"].listen == {
|
||||
"or": [{"and": ["manual_event", "by_string"]}, "fallback_event"]
|
||||
}
|
||||
|
||||
assert set(FragmentFlow._start_methods) == {"begin", "restart"}
|
||||
assert FragmentFlow._listeners["restart"] == ("OR", ["restart_event"])
|
||||
assert FragmentFlow._listeners["by_callable"] == ("OR", ["begin"])
|
||||
assert FragmentFlow._listeners["by_string"] == ("OR", ["manual_event"])
|
||||
assert FragmentFlow._listeners["by_and"] == {
|
||||
"type": "AND",
|
||||
"conditions": ["begin", "by_callable"],
|
||||
}
|
||||
assert FragmentFlow._listeners["nested"] == {
|
||||
"type": "OR",
|
||||
"conditions": [
|
||||
{"type": "AND", "conditions": ["manual_event", "by_string"]},
|
||||
"fallback_event",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def test_extract_flow_definition_prefers_fragments_over_legacy_metadata():
|
||||
class RegistryFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "begin"
|
||||
|
||||
@listen(begin)
|
||||
def handle(self):
|
||||
return "handle"
|
||||
|
||||
@router(handle, emit=["done"])
|
||||
def decide(self):
|
||||
return "done"
|
||||
|
||||
handle = RegistryFlow.__dict__["handle"]
|
||||
original_trigger_methods = handle.__trigger_methods__
|
||||
handle.__trigger_methods__ = ["wrong"]
|
||||
try:
|
||||
_, listeners, routers, router_emit = flow_dsl.extract_flow_definition(
|
||||
{
|
||||
"begin": RegistryFlow.__dict__["begin"],
|
||||
"handle": handle,
|
||||
"decide": RegistryFlow.__dict__["decide"],
|
||||
}
|
||||
)
|
||||
finally:
|
||||
handle.__trigger_methods__ = original_trigger_methods
|
||||
|
||||
assert listeners["handle"] == ("OR", ["begin"])
|
||||
assert listeners["decide"] == ("OR", ["handle"])
|
||||
assert routers == {"decide"}
|
||||
assert router_emit == {"decide": ["done"]}
|
||||
|
||||
|
||||
def test_flow_definition_falls_back_to_legacy_metadata_without_fragment():
|
||||
class LegacyMetadataFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "begin"
|
||||
|
||||
@router(begin, emit=["left"])
|
||||
def decide(self):
|
||||
return "left"
|
||||
|
||||
@listen("left")
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
for method_name in ("begin", "decide", "left"):
|
||||
method = LegacyMetadataFlow.__dict__[method_name]
|
||||
delattr(method, "__flow_method_definition__")
|
||||
|
||||
definition = flow_dsl.build_flow_definition(LegacyMetadataFlow)
|
||||
|
||||
assert definition.methods["begin"].start is True
|
||||
assert definition.methods["decide"].listen == "begin"
|
||||
assert definition.methods["decide"].router is True
|
||||
assert definition.methods["decide"].emit == ["left"]
|
||||
assert definition.methods["left"].listen == "left"
|
||||
|
||||
|
||||
def test_human_feedback_emit_overrides_inner_router_emit():
|
||||
class FeedbackOverRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "data"
|
||||
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
@router(begin, emit=["x", "y"])
|
||||
def route(self):
|
||||
return "approved"
|
||||
|
||||
@listen("approved")
|
||||
def proceed(self):
|
||||
return "ok"
|
||||
|
||||
assert "route" in FeedbackOverRouterFlow._routers
|
||||
assert FeedbackOverRouterFlow._router_emit["route"] == ["approved", "rejected"]
|
||||
|
||||
route = FeedbackOverRouterFlow.flow_definition().methods["route"]
|
||||
assert route.router is True
|
||||
assert route.human_feedback is not None
|
||||
assert route.human_feedback.emit == ["approved", "rejected"]
|
||||
assert route.emit is None
|
||||
|
||||
|
||||
def test_flow_definition_classifies_start_router_from_human_feedback_emit():
|
||||
class StartRouterFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["continue", "stop"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def entry_point(self):
|
||||
return "data"
|
||||
|
||||
@listen("continue")
|
||||
def proceed(self):
|
||||
return "proceeding"
|
||||
|
||||
@listen("stop")
|
||||
def halt(self):
|
||||
return "halted"
|
||||
|
||||
definition = StartRouterFlow.flow_definition()
|
||||
entry_point = definition.methods["entry_point"]
|
||||
|
||||
assert entry_point.is_start is True
|
||||
assert entry_point.router is True
|
||||
assert entry_point.human_feedback is not None
|
||||
assert entry_point.human_feedback.emit == ["continue", "stop"]
|
||||
assert entry_point.emit is None
|
||||
|
||||
|
||||
def test_flow_definition_round_trips_json_and_yaml():
|
||||
class RoundTripFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self):
|
||||
return "left"
|
||||
|
||||
@listen("left")
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
definition = RoundTripFlow.flow_definition()
|
||||
|
||||
json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json())
|
||||
yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml())
|
||||
|
||||
assert json_round_trip.to_dict() == definition.to_dict()
|
||||
assert yaml_round_trip.to_dict() == definition.to_dict()
|
||||
assert yaml_round_trip.methods["decide"].router is True
|
||||
assert yaml_round_trip.methods["decide"].listen == "begin"
|
||||
|
||||
|
||||
def test_flow_definition_detects_persist_metadata():
|
||||
@persist(verbose=True)
|
||||
class PersistedFlow(Flow[dict]):
|
||||
initial_state = {}
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@persist(verbose=False)
|
||||
@listen(begin)
|
||||
def checkpoint(self):
|
||||
return "saved"
|
||||
|
||||
definition = PersistedFlow.flow_definition()
|
||||
|
||||
assert definition.persist is not None
|
||||
assert definition.persist.enabled is True
|
||||
assert definition.persist.verbose is True
|
||||
|
||||
assert definition.methods["begin"].persist is None
|
||||
|
||||
method_persist = definition.methods["checkpoint"].persist
|
||||
assert method_persist is not None
|
||||
assert method_persist.enabled is True
|
||||
assert method_persist.verbose is False
|
||||
|
||||
|
||||
def test_flow_definition_allows_dynamic_router_emit(caplog):
|
||||
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
|
||||
|
||||
class DynamicRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self):
|
||||
return self.state["dynamic_event"]
|
||||
|
||||
definition = DynamicRouterFlow.flow_definition()
|
||||
|
||||
assert definition.methods["decide"].emit is None
|
||||
assert definition.diagnostics == []
|
||||
assert caplog.records == []
|
||||
|
||||
|
||||
def test_flow_definition_infers_literal_router_emit():
|
||||
class LiteralRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self) -> Literal["left", "right"]:
|
||||
return "left"
|
||||
|
||||
@listen("left")
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
@listen("right")
|
||||
def right(self):
|
||||
return "right"
|
||||
|
||||
definition = LiteralRouterFlow.flow_definition()
|
||||
|
||||
assert definition.methods["decide"].emit == ["left", "right"]
|
||||
|
||||
|
||||
def test_flow_definition_infers_enum_router_emit():
|
||||
class Decision(str, Enum):
|
||||
APPROVE = "approve"
|
||||
REJECT = "reject"
|
||||
|
||||
class EnumRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self) -> Decision:
|
||||
return Decision.APPROVE
|
||||
|
||||
@listen("approve")
|
||||
def approve(self):
|
||||
return "approve"
|
||||
|
||||
@listen("reject")
|
||||
def reject(self):
|
||||
return "reject"
|
||||
|
||||
definition = EnumRouterFlow.flow_definition()
|
||||
|
||||
assert definition.methods["decide"].emit == ["approve", "reject"]
|
||||
|
||||
|
||||
def test_flow_definition_infers_literal_union_router_emit():
|
||||
class LiteralUnionRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self) -> Literal["left"] | Literal["right"]:
|
||||
return "left"
|
||||
|
||||
@listen("left")
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
@listen("right")
|
||||
def right(self):
|
||||
return "right"
|
||||
|
||||
definition = LiteralUnionRouterFlow.flow_definition()
|
||||
|
||||
assert definition.methods["decide"].emit == ["left", "right"]
|
||||
|
||||
|
||||
def test_flow_definition_does_not_infer_unannotated_router_body_emit():
|
||||
class UnannotatedRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self):
|
||||
return "left"
|
||||
|
||||
@listen("left")
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
definition = UnannotatedRouterFlow.flow_definition()
|
||||
|
||||
assert definition.methods["decide"].emit is None
|
||||
|
||||
|
||||
def test_flow_definition_accepts_explicit_router_events():
|
||||
class ExplicitRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin, emit=["left", "right", "left"])
|
||||
def decide(self):
|
||||
return self.state["dynamic_event"]
|
||||
|
||||
@listen("left")
|
||||
def left(self):
|
||||
return "left"
|
||||
|
||||
@listen("right")
|
||||
def right(self):
|
||||
return "right"
|
||||
|
||||
definition = ExplicitRouterFlow.flow_definition()
|
||||
|
||||
assert definition.methods["decide"].emit == ["left", "right"]
|
||||
|
||||
|
||||
def test_flow_definition_preserves_diagnostics_loaded_from_contract():
|
||||
definition = flow_definition.FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "LoadedDiagnosticsFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
},
|
||||
"diagnostics": [
|
||||
{
|
||||
"code": "serialized_warning",
|
||||
"message": "Preserved serialized diagnostic",
|
||||
"severity": "warning",
|
||||
"path": "methods.decision",
|
||||
},
|
||||
{
|
||||
"code": "router_without_trigger",
|
||||
"message": "router: true requires either start or listen",
|
||||
"severity": "error",
|
||||
"path": "methods.decision",
|
||||
},
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
codes = [diagnostic.code for diagnostic in definition.diagnostics]
|
||||
assert "serialized_warning" in codes
|
||||
assert codes.count("router_without_trigger") == 1
|
||||
|
||||
|
||||
def test_router_human_feedback_preserves_existing_router_metadata():
|
||||
class RouterHumanFeedbackFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@human_feedback(message="Review route:")
|
||||
@router(begin, emit=["approved", "rejected"])
|
||||
def decide(self):
|
||||
return "approved"
|
||||
|
||||
@listen("approved")
|
||||
def approved(self):
|
||||
return "approved"
|
||||
|
||||
definition = RouterHumanFeedbackFlow.flow_definition()
|
||||
method = definition.methods["decide"]
|
||||
|
||||
assert method.router is True
|
||||
assert method.listen == "begin"
|
||||
assert method.emit == ["approved", "rejected"]
|
||||
assert method.human_feedback is not None
|
||||
|
||||
|
||||
def test_dynamic_router_does_not_log_at_class_definition_time(caplog):
|
||||
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
|
||||
|
||||
class LazyDynamicRouterFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self):
|
||||
return self.state["dynamic_event"]
|
||||
|
||||
# No diagnostics should be logged merely by defining the class -- the
|
||||
# FlowDefinition is built lazily.
|
||||
assert caplog.records == []
|
||||
|
||||
# Explicit access still should not log visualization-only diagnostics.
|
||||
definition = LazyDynamicRouterFlow.flow_definition()
|
||||
assert definition.diagnostics == []
|
||||
assert caplog.records == []
|
||||
|
||||
|
||||
def test_dynamic_router_string_listener_is_valid_contract():
|
||||
class DynamicRouterListenerFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def decide(self):
|
||||
return self.state["dynamic_event"]
|
||||
|
||||
@listen("dynamic_event")
|
||||
def handle(self):
|
||||
return "handled"
|
||||
|
||||
definition = DynamicRouterListenerFlow.flow_definition()
|
||||
|
||||
assert definition.diagnostics == []
|
||||
|
||||
|
||||
def test_static_string_listener_is_allowed_by_contract():
|
||||
definition = flow_definition.FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "TypoFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"listen": "begni"},
|
||||
},
|
||||
}
|
||||
)
|
||||
assert definition.diagnostics == []
|
||||
|
||||
|
||||
def test_start_false_not_classified_as_start_method():
|
||||
definition = flow_definition.FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "ExplicitNonStartFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"handle": {"start": False, "listen": "begin"},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
assert definition.methods["begin"].is_start is True
|
||||
assert definition.methods["handle"].is_start is False
|
||||
|
||||
class ExplicitNonStartFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@listen(begin)
|
||||
def handle(self):
|
||||
return "handled"
|
||||
|
||||
# Attach the loaded contract (with explicit ``start: false``) so the
|
||||
# projections read from it rather than rebuilding from the DSL.
|
||||
ExplicitNonStartFlow._flow_definition = definition
|
||||
|
||||
flow = ExplicitNonStartFlow()
|
||||
viz_structure = visualization_builder.build_flow_structure(flow)
|
||||
assert "handle" not in viz_structure["start_methods"]
|
||||
assert viz_structure["nodes"]["handle"]["type"] != "start"
|
||||
|
||||
|
||||
def test_flow_definition_cache_is_not_inherited_by_subclasses():
|
||||
class ParentFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "begin"
|
||||
|
||||
parent_definition = ParentFlow.flow_definition()
|
||||
|
||||
class ChildFlow(ParentFlow):
|
||||
@listen(ParentFlow.begin)
|
||||
def child_step(self):
|
||||
return "child"
|
||||
|
||||
child_definition = ChildFlow.flow_definition()
|
||||
|
||||
assert parent_definition.name == "ParentFlow"
|
||||
assert child_definition.name == "ChildFlow"
|
||||
assert child_definition is not parent_definition
|
||||
assert set(child_definition.methods) == {"begin", "child_step"}
|
||||
|
||||
|
||||
def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
|
||||
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
|
||||
|
||||
definition = flow_definition.FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "LoadedFlow",
|
||||
"methods": {
|
||||
"decision": {
|
||||
"router": True,
|
||||
"emit": ["continue"],
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
assert any(
|
||||
diagnostic.code == "router_without_trigger"
|
||||
for diagnostic in definition.diagnostics
|
||||
)
|
||||
assert any(
|
||||
record.levelno == logging.ERROR
|
||||
and "LoadedFlow" in record.message
|
||||
and "router_without_trigger" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
@@ -1,818 +0,0 @@
|
||||
"""Tests for flow_serializer.py - Flow structure serialization for Studio UI."""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
||||
from crewai.flow.flow_serializer import flow_structure
|
||||
from crewai.flow.human_feedback import human_feedback
|
||||
|
||||
|
||||
class TestSimpleLinearFlow:
|
||||
"""Test simple linear flow (start → listen → listen)."""
|
||||
|
||||
def test_linear_flow_structure(self):
|
||||
"""Test a simple sequential flow structure."""
|
||||
|
||||
class LinearFlow(Flow):
|
||||
"""A simple linear flow for testing."""
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@listen(begin)
|
||||
def process(self):
|
||||
return "processed"
|
||||
|
||||
@listen(process)
|
||||
def finalize(self):
|
||||
return "done"
|
||||
|
||||
structure = flow_structure(LinearFlow)
|
||||
|
||||
assert structure["name"] == "LinearFlow"
|
||||
assert structure["description"] == "A simple linear flow for testing."
|
||||
assert len(structure["methods"]) == 3
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
assert method_map["begin"]["type"] == "start"
|
||||
assert method_map["process"]["type"] == "listen"
|
||||
assert method_map["finalize"]["type"] == "listen"
|
||||
|
||||
assert len(structure["edges"]) == 2
|
||||
|
||||
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
|
||||
assert ("begin", "process") in edge_pairs
|
||||
assert ("process", "finalize") in edge_pairs
|
||||
|
||||
for edge in structure["edges"]:
|
||||
assert edge["edge_type"] == "listen"
|
||||
assert edge["condition"] is None
|
||||
|
||||
|
||||
class TestRouterFlow:
|
||||
"""Test flow with router branching."""
|
||||
|
||||
def test_router_flow_structure(self):
|
||||
"""Test a flow with router that branches to different paths."""
|
||||
|
||||
class BranchingFlow(Flow):
|
||||
@start()
|
||||
def init(self):
|
||||
return "initialized"
|
||||
|
||||
@router(init)
|
||||
def decide(self) -> Literal["path_a", "path_b"]:
|
||||
return "path_a"
|
||||
|
||||
@listen("path_a")
|
||||
def handle_a(self):
|
||||
return "handled_a"
|
||||
|
||||
@listen("path_b")
|
||||
def handle_b(self):
|
||||
return "handled_b"
|
||||
|
||||
structure = flow_structure(BranchingFlow)
|
||||
|
||||
assert structure["name"] == "BranchingFlow"
|
||||
assert len(structure["methods"]) == 4
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
assert method_map["init"]["type"] == "start"
|
||||
assert method_map["decide"]["type"] == "router"
|
||||
assert method_map["handle_a"]["type"] == "listen"
|
||||
assert method_map["handle_b"]["type"] == "listen"
|
||||
|
||||
assert "path_a" in method_map["decide"]["router_paths"]
|
||||
assert "path_b" in method_map["decide"]["router_paths"]
|
||||
|
||||
# Should have: init -> decide (listen), decide -> handle_a (route), decide -> handle_b (route)
|
||||
listen_edges = [e for e in structure["edges"] if e["edge_type"] == "listen"]
|
||||
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
|
||||
|
||||
assert len(listen_edges) == 1
|
||||
assert listen_edges[0]["from_method"] == "init"
|
||||
assert listen_edges[0]["to_method"] == "decide"
|
||||
|
||||
assert len(route_edges) == 2
|
||||
route_targets = {e["to_method"] for e in route_edges}
|
||||
assert "handle_a" in route_targets
|
||||
assert "handle_b" in route_targets
|
||||
|
||||
route_conditions = {e["to_method"]: e["condition"] for e in route_edges}
|
||||
assert route_conditions["handle_a"] == "path_a"
|
||||
assert route_conditions["handle_b"] == "path_b"
|
||||
|
||||
|
||||
class TestAndOrConditions:
|
||||
"""Test flow with AND/OR conditions."""
|
||||
|
||||
def test_and_condition_flow(self):
|
||||
"""Test a flow where a method waits for multiple methods (AND)."""
|
||||
|
||||
class AndConditionFlow(Flow):
|
||||
@start()
|
||||
def step_a(self):
|
||||
return "a"
|
||||
|
||||
@start()
|
||||
def step_b(self):
|
||||
return "b"
|
||||
|
||||
@listen(and_(step_a, step_b))
|
||||
def converge(self):
|
||||
return "converged"
|
||||
|
||||
structure = flow_structure(AndConditionFlow)
|
||||
|
||||
assert len(structure["methods"]) == 3
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
assert method_map["step_a"]["type"] == "start"
|
||||
assert method_map["step_b"]["type"] == "start"
|
||||
assert method_map["converge"]["type"] == "listen"
|
||||
|
||||
assert method_map["converge"]["condition_type"] == "AND"
|
||||
|
||||
triggers = method_map["converge"]["trigger_methods"]
|
||||
assert "step_a" in triggers
|
||||
assert "step_b" in triggers
|
||||
|
||||
converge_edges = [e for e in structure["edges"] if e["to_method"] == "converge"]
|
||||
assert len(converge_edges) == 2
|
||||
|
||||
def test_or_condition_flow(self):
|
||||
"""Test a flow where a method is triggered by any of multiple methods (OR)."""
|
||||
|
||||
class OrConditionFlow(Flow):
|
||||
@start()
|
||||
def path_1(self):
|
||||
return "1"
|
||||
|
||||
@start()
|
||||
def path_2(self):
|
||||
return "2"
|
||||
|
||||
@listen(or_(path_1, path_2))
|
||||
def handle_any(self):
|
||||
return "handled"
|
||||
|
||||
structure = flow_structure(OrConditionFlow)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
assert method_map["handle_any"]["condition_type"] == "OR"
|
||||
|
||||
triggers = method_map["handle_any"]["trigger_methods"]
|
||||
assert "path_1" in triggers
|
||||
assert "path_2" in triggers
|
||||
|
||||
|
||||
class TestHumanFeedbackMethods:
|
||||
"""Test flow with @human_feedback decorated methods."""
|
||||
|
||||
def test_human_feedback_detection(self):
|
||||
"""Test that human feedback methods are correctly identified."""
|
||||
|
||||
class HumanFeedbackFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Please review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review_step(self):
|
||||
return "content to review"
|
||||
|
||||
@listen("approved")
|
||||
def handle_approved(self):
|
||||
return "approved"
|
||||
|
||||
@listen("rejected")
|
||||
def handle_rejected(self):
|
||||
return "rejected"
|
||||
|
||||
structure = flow_structure(HumanFeedbackFlow)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
# review_step should have human feedback
|
||||
assert method_map["review_step"]["has_human_feedback"] is True
|
||||
# It's a start+router (due to emit)
|
||||
assert method_map["review_step"]["type"] == "start_router"
|
||||
assert "approved" in method_map["review_step"]["router_paths"]
|
||||
assert "rejected" in method_map["review_step"]["router_paths"]
|
||||
|
||||
# Other methods should not have human feedback
|
||||
assert method_map["handle_approved"]["has_human_feedback"] is False
|
||||
assert method_map["handle_rejected"]["has_human_feedback"] is False
|
||||
|
||||
def test_listen_plus_human_feedback_router_edges(self):
|
||||
"""Test that @listen + @human_feedback(emit=...) generates router edges.
|
||||
|
||||
This is the pattern used in the whitepaper generator:
|
||||
a listener method that also acts as a router via @human_feedback(emit=[...]).
|
||||
The serializer must generate edges from this method to listeners of its emit paths.
|
||||
"""
|
||||
|
||||
class ReviewFlow(Flow):
|
||||
@start()
|
||||
def generate(self):
|
||||
return "content"
|
||||
|
||||
@listen(generate)
|
||||
@human_feedback(
|
||||
message="Review this:",
|
||||
emit=["approved", "needs_changes", "cancelled"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "review result"
|
||||
|
||||
@listen("approved")
|
||||
def handle_approved(self):
|
||||
return "done"
|
||||
|
||||
@listen("needs_changes")
|
||||
def handle_changes(self):
|
||||
return "regenerating"
|
||||
|
||||
@listen("cancelled")
|
||||
def handle_cancelled(self):
|
||||
return "cancelled"
|
||||
|
||||
structure = flow_structure(ReviewFlow)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
edge_set = {(e["from_method"], e["to_method"], e.get("condition")) for e in structure["edges"]}
|
||||
|
||||
# review should be detected as a router with the emit paths
|
||||
assert method_map["review"]["type"] == "router"
|
||||
assert set(method_map["review"]["router_paths"]) == {"approved", "needs_changes", "cancelled"}
|
||||
assert method_map["review"]["has_human_feedback"] is True
|
||||
|
||||
assert ("generate", "review", None) in edge_set
|
||||
|
||||
assert ("review", "handle_approved", "approved") in edge_set
|
||||
assert ("review", "handle_changes", "needs_changes") in edge_set
|
||||
assert ("review", "handle_cancelled", "cancelled") in edge_set
|
||||
|
||||
|
||||
class TestCrewReferences:
|
||||
"""Test detection of Crew references in method bodies."""
|
||||
|
||||
def test_crew_detection_with_crew_call(self):
|
||||
"""Test that .crew() calls are detected."""
|
||||
|
||||
class FlowWithCrew(Flow):
|
||||
@start()
|
||||
def run_crew(self):
|
||||
return "result"
|
||||
|
||||
@listen(run_crew)
|
||||
def no_crew(self):
|
||||
return "done"
|
||||
|
||||
structure = flow_structure(FlowWithCrew)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
# Note: Since the actual .crew() call is in a comment/string,
|
||||
# We're testing the mechanism exists.
|
||||
assert "has_crew" in method_map["run_crew"]
|
||||
assert "has_crew" in method_map["no_crew"]
|
||||
|
||||
def test_no_crew_when_absent(self):
|
||||
"""Test that methods without Crew refs return has_crew=False."""
|
||||
|
||||
class SimpleNonCrewFlow(Flow):
|
||||
@start()
|
||||
def calculate(self):
|
||||
return 1 + 1
|
||||
|
||||
@listen(calculate)
|
||||
def display(self):
|
||||
return "result"
|
||||
|
||||
structure = flow_structure(SimpleNonCrewFlow)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
assert method_map["calculate"]["has_crew"] is False
|
||||
assert method_map["display"]["has_crew"] is False
|
||||
|
||||
|
||||
class TestTypedStateSchema:
|
||||
"""Test flow with typed Pydantic state."""
|
||||
|
||||
def test_pydantic_state_schema_extraction(self):
|
||||
"""Test extracting state schema from a Flow with Pydantic state."""
|
||||
|
||||
class MyState(BaseModel):
|
||||
counter: int = 0
|
||||
message: str = ""
|
||||
items: list[str] = Field(default_factory=list)
|
||||
|
||||
class TypedStateFlow(Flow[MyState]):
|
||||
initial_state = MyState
|
||||
|
||||
@start()
|
||||
def increment(self):
|
||||
self.state.counter += 1
|
||||
return self.state.counter
|
||||
|
||||
@listen(increment)
|
||||
def display(self):
|
||||
return f"Count: {self.state.counter}"
|
||||
|
||||
structure = flow_structure(TypedStateFlow)
|
||||
|
||||
assert structure["state_schema"] is not None
|
||||
fields = structure["state_schema"]["fields"]
|
||||
|
||||
field_names = {f["name"] for f in fields}
|
||||
assert "counter" in field_names
|
||||
assert "message" in field_names
|
||||
assert "items" in field_names
|
||||
|
||||
field_map = {f["name"]: f for f in fields}
|
||||
assert "int" in field_map["counter"]["type"]
|
||||
assert "str" in field_map["message"]["type"]
|
||||
|
||||
assert field_map["counter"]["default"] == 0
|
||||
assert field_map["message"]["default"] == ""
|
||||
|
||||
def test_dict_state_returns_none(self):
|
||||
"""Test that flows using dict state return None for state_schema."""
|
||||
|
||||
class DictStateFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
self.state["count"] = 1
|
||||
return "started"
|
||||
|
||||
structure = flow_structure(DictStateFlow)
|
||||
|
||||
assert structure["state_schema"] is None
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and special scenarios."""
|
||||
|
||||
def test_start_router_combo(self):
|
||||
"""Test a method that is both @start and a router (via human_feedback emit)."""
|
||||
|
||||
class StartRouterFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["continue", "stop"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def entry_point(self):
|
||||
return "data"
|
||||
|
||||
@listen("continue")
|
||||
def proceed(self):
|
||||
return "proceeding"
|
||||
|
||||
@listen("stop")
|
||||
def halt(self):
|
||||
return "halted"
|
||||
|
||||
structure = flow_structure(StartRouterFlow)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
assert method_map["entry_point"]["type"] == "start_router"
|
||||
assert method_map["entry_point"]["has_human_feedback"] is True
|
||||
assert "continue" in method_map["entry_point"]["router_paths"]
|
||||
assert "stop" in method_map["entry_point"]["router_paths"]
|
||||
|
||||
def test_multiple_start_methods(self):
|
||||
"""Test a flow with multiple start methods."""
|
||||
|
||||
class MultiStartFlow(Flow):
|
||||
@start()
|
||||
def start_a(self):
|
||||
return "a"
|
||||
|
||||
@start()
|
||||
def start_b(self):
|
||||
return "b"
|
||||
|
||||
@listen(and_(start_a, start_b))
|
||||
def combine(self):
|
||||
return "combined"
|
||||
|
||||
structure = flow_structure(MultiStartFlow)
|
||||
|
||||
start_methods = [m for m in structure["methods"] if m["type"] == "start"]
|
||||
assert len(start_methods) == 2
|
||||
|
||||
start_names = {m["name"] for m in start_methods}
|
||||
assert "start_a" in start_names
|
||||
assert "start_b" in start_names
|
||||
|
||||
def test_orphan_methods(self):
|
||||
"""Test that orphan methods (not connected to flow) are still captured."""
|
||||
|
||||
class FlowWithOrphan(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
@listen(begin)
|
||||
def connected(self):
|
||||
return "connected"
|
||||
|
||||
@listen("never_triggered")
|
||||
def orphan(self):
|
||||
return "orphan"
|
||||
|
||||
structure = flow_structure(FlowWithOrphan)
|
||||
|
||||
method_names = {m["name"] for m in structure["methods"]}
|
||||
assert "orphan" in method_names
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
assert method_map["orphan"]["trigger_methods"] == ["never_triggered"]
|
||||
|
||||
def test_empty_flow(self):
|
||||
"""Test building structure for a flow with no methods."""
|
||||
|
||||
class EmptyFlow(Flow):
|
||||
pass
|
||||
|
||||
structure = flow_structure(EmptyFlow)
|
||||
|
||||
assert structure["name"] == "EmptyFlow"
|
||||
assert structure["methods"] == []
|
||||
assert structure["edges"] == []
|
||||
assert structure["state_schema"] is None
|
||||
|
||||
def test_flow_with_docstring(self):
|
||||
"""Test that flow docstring is captured."""
|
||||
|
||||
class DocumentedFlow(Flow):
|
||||
"""This is a well-documented flow.
|
||||
|
||||
It has multiple lines of documentation.
|
||||
"""
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
structure = flow_structure(DocumentedFlow)
|
||||
|
||||
assert structure["description"] is not None
|
||||
assert "well-documented flow" in structure["description"]
|
||||
|
||||
def test_flow_without_docstring(self):
|
||||
"""Test that missing docstring returns None."""
|
||||
|
||||
class UndocumentedFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
structure = flow_structure(UndocumentedFlow)
|
||||
|
||||
assert structure["description"] is None
|
||||
|
||||
def test_nested_conditions(self):
|
||||
"""Test flow with nested AND/OR conditions."""
|
||||
|
||||
class NestedConditionFlow(Flow):
|
||||
@start()
|
||||
def a(self):
|
||||
return "a"
|
||||
|
||||
@start()
|
||||
def b(self):
|
||||
return "b"
|
||||
|
||||
@start()
|
||||
def c(self):
|
||||
return "c"
|
||||
|
||||
@listen(or_(and_(a, b), c))
|
||||
def complex_trigger(self):
|
||||
return "triggered"
|
||||
|
||||
structure = flow_structure(NestedConditionFlow)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
triggers = method_map["complex_trigger"]["trigger_methods"]
|
||||
assert len(triggers) == 3
|
||||
assert "a" in triggers
|
||||
assert "b" in triggers
|
||||
assert "c" in triggers
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Test error handling and validation."""
|
||||
|
||||
def test_instance_raises_type_error(self):
|
||||
"""Test that passing an instance raises TypeError."""
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
flow_instance = TestFlow()
|
||||
|
||||
with pytest.raises(TypeError) as exc_info:
|
||||
flow_structure(flow_instance)
|
||||
|
||||
assert "requires a Flow class, not an instance" in str(exc_info.value)
|
||||
|
||||
def test_non_class_raises_type_error(self):
|
||||
"""Test that passing non-class raises TypeError."""
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
flow_structure("not a class")
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
flow_structure(123)
|
||||
|
||||
|
||||
class TestEdgeGeneration:
|
||||
"""Test edge generation in various scenarios."""
|
||||
|
||||
def test_all_edges_generated_correctly(self):
|
||||
"""Verify all edges are correctly generated for a complex flow."""
|
||||
|
||||
class ComplexFlow(Flow):
|
||||
@start()
|
||||
def entry(self):
|
||||
return "started"
|
||||
|
||||
@listen(entry)
|
||||
def step_1(self):
|
||||
return "step_1"
|
||||
|
||||
@router(step_1)
|
||||
def branch(self) -> Literal["left", "right"]:
|
||||
return "left"
|
||||
|
||||
@listen("left")
|
||||
def left_path(self):
|
||||
return "left_done"
|
||||
|
||||
@listen("right")
|
||||
def right_path(self):
|
||||
return "right_done"
|
||||
|
||||
@listen(or_(left_path, right_path))
|
||||
def converge(self):
|
||||
return "done"
|
||||
|
||||
structure = flow_structure(ComplexFlow)
|
||||
|
||||
edges = structure["edges"]
|
||||
|
||||
listen_edges = [(e["from_method"], e["to_method"]) for e in edges if e["edge_type"] == "listen"]
|
||||
|
||||
assert ("entry", "step_1") in listen_edges
|
||||
assert ("step_1", "branch") in listen_edges
|
||||
assert ("left_path", "converge") in listen_edges
|
||||
assert ("right_path", "converge") in listen_edges
|
||||
|
||||
route_edges = [(e["from_method"], e["to_method"], e["condition"]) for e in edges if e["edge_type"] == "route"]
|
||||
|
||||
assert ("branch", "left_path", "left") in route_edges
|
||||
assert ("branch", "right_path", "right") in route_edges
|
||||
|
||||
def test_router_edge_conditions(self):
|
||||
"""Test that router edge conditions are properly set."""
|
||||
|
||||
class RouterConditionFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "start"
|
||||
|
||||
@router(begin)
|
||||
def route(self) -> Literal["option_1", "option_2", "option_3"]:
|
||||
return "option_1"
|
||||
|
||||
@listen("option_1")
|
||||
def handle_1(self):
|
||||
return "1"
|
||||
|
||||
@listen("option_2")
|
||||
def handle_2(self):
|
||||
return "2"
|
||||
|
||||
@listen("option_3")
|
||||
def handle_3(self):
|
||||
return "3"
|
||||
|
||||
structure = flow_structure(RouterConditionFlow)
|
||||
|
||||
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
|
||||
|
||||
assert len(route_edges) == 3
|
||||
|
||||
conditions = {e["to_method"]: e["condition"] for e in route_edges}
|
||||
assert conditions["handle_1"] == "option_1"
|
||||
assert conditions["handle_2"] == "option_2"
|
||||
assert conditions["handle_3"] == "option_3"
|
||||
|
||||
|
||||
class TestMethodTypeClassification:
|
||||
"""Test method type classification."""
|
||||
|
||||
def test_all_method_types(self):
|
||||
"""Test classification of all method types."""
|
||||
|
||||
class AllTypesFlow(Flow):
|
||||
@start()
|
||||
def start_only(self):
|
||||
return "start"
|
||||
|
||||
@listen(start_only)
|
||||
def listen_only(self):
|
||||
return "listen"
|
||||
|
||||
@router(listen_only)
|
||||
def router_only(self) -> Literal["path"]:
|
||||
return "path"
|
||||
|
||||
@listen("path")
|
||||
def after_router(self):
|
||||
return "after"
|
||||
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review",
|
||||
emit=["yes", "no"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def start_and_router(self):
|
||||
return "data"
|
||||
|
||||
structure = flow_structure(AllTypesFlow)
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
|
||||
assert method_map["start_only"]["type"] == "start"
|
||||
assert method_map["listen_only"]["type"] == "listen"
|
||||
assert method_map["router_only"]["type"] == "router"
|
||||
assert method_map["after_router"]["type"] == "listen"
|
||||
assert method_map["start_and_router"]["type"] == "start_router"
|
||||
|
||||
|
||||
class TestInputDetection:
|
||||
"""Test flow input detection."""
|
||||
|
||||
def test_inputs_list_exists(self):
|
||||
"""Test that inputs list is always present."""
|
||||
|
||||
class SimpleFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
structure = flow_structure(SimpleFlow)
|
||||
|
||||
assert "inputs" in structure
|
||||
assert isinstance(structure["inputs"], list)
|
||||
|
||||
|
||||
class TestJsonSerializable:
|
||||
"""Test that output is JSON serializable."""
|
||||
|
||||
def test_structure_is_json_serializable(self):
|
||||
"""Test that the entire structure can be JSON serialized."""
|
||||
import json
|
||||
|
||||
class MyState(BaseModel):
|
||||
value: int = 0
|
||||
|
||||
class SerializableFlow(Flow[MyState]):
|
||||
"""Test flow for JSON serialization."""
|
||||
|
||||
initial_state = MyState
|
||||
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review",
|
||||
emit=["ok", "not_ok"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def begin(self):
|
||||
return "data"
|
||||
|
||||
@listen("ok")
|
||||
def proceed(self):
|
||||
return "done"
|
||||
|
||||
structure = flow_structure(SerializableFlow)
|
||||
|
||||
json_str = json.dumps(structure)
|
||||
assert json_str is not None
|
||||
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["name"] == "SerializableFlow"
|
||||
assert len(parsed["methods"]) > 0
|
||||
|
||||
|
||||
class TestFlowInheritance:
|
||||
"""Test flow inheritance scenarios."""
|
||||
|
||||
def test_child_flow_inherits_parent_methods(self):
|
||||
"""Test that FlowB inheriting from FlowA includes methods from both.
|
||||
|
||||
Note: FlowMeta propagates methods but does NOT fully propagate the
|
||||
_listeners registry from parent classes. This means edges defined
|
||||
in the parent class (e.g., parent_start -> parent_process) may not
|
||||
appear in the child's structure. This is a known FlowMeta limitation.
|
||||
"""
|
||||
|
||||
class FlowA(Flow):
|
||||
"""Parent flow with start method."""
|
||||
|
||||
@start()
|
||||
def parent_start(self):
|
||||
return "parent started"
|
||||
|
||||
@listen(parent_start)
|
||||
def parent_process(self):
|
||||
return "parent processed"
|
||||
|
||||
class FlowB(FlowA):
|
||||
"""Child flow with additional methods."""
|
||||
|
||||
@listen(FlowA.parent_process)
|
||||
def child_continue(self):
|
||||
return "child continued"
|
||||
|
||||
@listen(child_continue)
|
||||
def child_finalize(self):
|
||||
return "child finalized"
|
||||
|
||||
structure = flow_structure(FlowB)
|
||||
|
||||
assert structure["name"] == "FlowB"
|
||||
|
||||
method_names = {m["name"] for m in structure["methods"]}
|
||||
assert "parent_start" in method_names
|
||||
assert "parent_process" in method_names
|
||||
assert "child_continue" in method_names
|
||||
assert "child_finalize" in method_names
|
||||
|
||||
method_map = {m["name"]: m for m in structure["methods"]}
|
||||
assert method_map["parent_start"]["type"] == "start"
|
||||
assert method_map["parent_process"]["type"] == "listen"
|
||||
assert method_map["child_continue"]["type"] == "listen"
|
||||
assert method_map["child_finalize"]["type"] == "listen"
|
||||
|
||||
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
|
||||
assert ("parent_process", "child_continue") in edge_pairs
|
||||
assert ("child_continue", "child_finalize") in edge_pairs
|
||||
|
||||
# KNOWN LIMITATION: Edges defined in parent class (parent_start -> parent_process)
|
||||
# are NOT propagated to child's _listeners registry by FlowMeta.
|
||||
# This is a FlowMeta limitation, not a serializer bug.
|
||||
|
||||
def test_child_flow_can_override_parent_method(self):
|
||||
"""Test that child can override parent methods."""
|
||||
|
||||
class BaseFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "base begin"
|
||||
|
||||
@listen(begin)
|
||||
def process(self):
|
||||
return "base process"
|
||||
|
||||
class ExtendedFlow(BaseFlow):
|
||||
@listen(BaseFlow.begin)
|
||||
def process(self):
|
||||
return "extended process"
|
||||
|
||||
@listen(process)
|
||||
def finalize(self):
|
||||
return "extended finalize"
|
||||
|
||||
structure = flow_structure(ExtendedFlow)
|
||||
|
||||
method_names = {m["name"] for m in structure["methods"]}
|
||||
assert "begin" in method_names
|
||||
assert "process" in method_names
|
||||
assert "finalize" in method_names
|
||||
|
||||
# Should have 3 methods total (not 4, since process is overridden)
|
||||
assert len(structure["methods"]) == 3
|
||||
@@ -8,6 +8,7 @@ from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
||||
from crewai.flow.flow_definition import FlowDefinition
|
||||
from crewai.flow.visualization import (
|
||||
build_flow_structure,
|
||||
visualize_flow_structure,
|
||||
@@ -36,14 +37,14 @@ class RouterFlow(Flow):
|
||||
@router(init)
|
||||
def decide(self):
|
||||
if hasattr(self, "state") and self.state.get("path") == "b":
|
||||
return "path_b"
|
||||
return "path_a"
|
||||
return "event_b"
|
||||
return "event_a"
|
||||
|
||||
@listen("path_a")
|
||||
@listen("event_a")
|
||||
def handle_a(self):
|
||||
return "handled_a"
|
||||
|
||||
@listen("path_b")
|
||||
@listen("event_b")
|
||||
def handle_b(self):
|
||||
return "handled_b"
|
||||
|
||||
@@ -69,13 +70,23 @@ class ComplexFlow(Flow):
|
||||
|
||||
@router(converge_and)
|
||||
def router_decision(self):
|
||||
return "final_path"
|
||||
return "final_event"
|
||||
|
||||
@listen("final_path")
|
||||
@listen("final_event")
|
||||
def finalize(self):
|
||||
return "complete"
|
||||
|
||||
|
||||
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
|
||||
flow_class._flow_definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": flow_class.__name__,
|
||||
"methods": methods,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def test_build_flow_structure_simple():
|
||||
"""Test building structure for a simple sequential flow."""
|
||||
flow = SimpleFlow()
|
||||
@@ -98,6 +109,47 @@ def test_build_flow_structure_simple():
|
||||
assert edge["condition_type"] == "OR"
|
||||
|
||||
|
||||
def test_build_flow_structure_from_flow_class():
|
||||
"""Test building structure from a Flow class via its FlowDefinition."""
|
||||
structure = build_flow_structure(SimpleFlow)
|
||||
|
||||
assert set(structure["nodes"]) == {"begin", "process"}
|
||||
assert structure["start_methods"] == ["begin"]
|
||||
assert structure["nodes"]["begin"]["class_name"] == "SimpleFlow"
|
||||
|
||||
|
||||
def test_build_flow_structure_from_flow_definition():
|
||||
"""Test building visualization directly from a FlowDefinition."""
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "DefinedFlow",
|
||||
"methods": {
|
||||
"begin": {"start": True},
|
||||
"decide": {
|
||||
"listen": "begin",
|
||||
"router": True,
|
||||
"emit": ["done"],
|
||||
},
|
||||
"finish": {"listen": "done"},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
structure = build_flow_structure(definition)
|
||||
|
||||
assert set(structure["nodes"]) == {"begin", "decide", "finish"}
|
||||
assert structure["start_methods"] == ["begin"]
|
||||
assert structure["router_methods"] == ["decide"]
|
||||
assert structure["nodes"]["begin"]["class_name"] == "DefinedFlow"
|
||||
assert any(
|
||||
edge["source"] == "decide"
|
||||
and edge["target"] == "finish"
|
||||
and edge["router_event"] == "done"
|
||||
for edge in structure["edges"]
|
||||
)
|
||||
|
||||
|
||||
def test_build_flow_structure_with_router():
|
||||
"""Test building structure for a flow with router."""
|
||||
flow = RouterFlow()
|
||||
@@ -111,13 +163,10 @@ def test_build_flow_structure_with_router():
|
||||
|
||||
router_node = structure["nodes"]["decide"]
|
||||
assert router_node["type"] == "router"
|
||||
assert "router_events" not in router_node
|
||||
|
||||
if "router_paths" in router_node:
|
||||
assert len(router_node["router_paths"]) >= 1
|
||||
assert any("path" in path for path in router_node["router_paths"])
|
||||
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_path"]]
|
||||
assert len(router_edges) >= 1
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_event"]]
|
||||
assert router_edges == []
|
||||
|
||||
|
||||
def test_build_flow_structure_with_and_or_conditions():
|
||||
@@ -203,49 +252,40 @@ def test_visualize_flow_structure_json_data():
|
||||
assert "handle_b" in js_content
|
||||
|
||||
assert "router" in js_content.lower()
|
||||
assert "path_a" in js_content
|
||||
assert "path_b" in js_content
|
||||
assert "event_a" in js_content
|
||||
assert "event_b" in js_content
|
||||
|
||||
|
||||
def test_node_metadata_includes_source_info():
|
||||
"""Test that nodes include source code and line number information."""
|
||||
def test_node_metadata_omits_source_info():
|
||||
"""Test that definition-only visualization omits Python source metadata."""
|
||||
flow = SimpleFlow()
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
for node_name, node_metadata in structure["nodes"].items():
|
||||
assert node_metadata["source_code"] is not None
|
||||
assert len(node_metadata["source_code"]) > 0
|
||||
assert node_metadata["source_start_line"] is not None
|
||||
assert node_metadata["source_start_line"] > 0
|
||||
assert node_metadata["source_file"] is not None
|
||||
assert node_metadata["source_file"].endswith(".py")
|
||||
for node_metadata in structure["nodes"].values():
|
||||
assert "source_code" not in node_metadata
|
||||
assert "source_lines" not in node_metadata
|
||||
assert "source_start_line" not in node_metadata
|
||||
assert "source_file" not in node_metadata
|
||||
|
||||
|
||||
def test_node_metadata_includes_method_signature():
|
||||
"""Test that nodes include method signature information."""
|
||||
def test_node_metadata_omits_method_signature():
|
||||
"""Test that definition-only visualization omits Python method signatures."""
|
||||
flow = SimpleFlow()
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
begin_node = structure["nodes"]["begin"]
|
||||
assert begin_node["method_signature"] is not None
|
||||
assert "operationId" in begin_node["method_signature"]
|
||||
assert begin_node["method_signature"]["operationId"] == "begin"
|
||||
assert "parameters" in begin_node["method_signature"]
|
||||
assert "returns" in begin_node["method_signature"]
|
||||
assert "method_signature" not in begin_node
|
||||
|
||||
|
||||
def test_router_node_has_correct_metadata():
|
||||
"""Test that router nodes have correct type and paths."""
|
||||
"""Test that router nodes have correct type and event metadata."""
|
||||
flow = RouterFlow()
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
router_node = structure["nodes"]["decide"]
|
||||
assert router_node["type"] == "router"
|
||||
assert router_node["is_router"] is True
|
||||
assert router_node["router_paths"] is not None
|
||||
assert len(router_node["router_paths"]) == 2
|
||||
assert "path_a" in router_node["router_paths"]
|
||||
assert "path_b" in router_node["router_paths"]
|
||||
assert "router_events" not in router_node
|
||||
|
||||
|
||||
def test_listen_node_has_trigger_methods():
|
||||
@@ -255,7 +295,7 @@ def test_listen_node_has_trigger_methods():
|
||||
|
||||
handle_a_node = structure["nodes"]["handle_a"]
|
||||
assert handle_a_node["trigger_methods"] is not None
|
||||
assert "path_a" in handle_a_node["trigger_methods"]
|
||||
assert "event_a" in handle_a_node["trigger_methods"]
|
||||
|
||||
|
||||
def test_and_condition_node_metadata():
|
||||
@@ -317,16 +357,15 @@ def test_topological_path_counting():
|
||||
assert len(structure["edges"]) > 0
|
||||
|
||||
|
||||
def test_class_signature_metadata():
|
||||
"""Test that nodes include class signature information."""
|
||||
def test_class_metadata_comes_from_definition():
|
||||
"""Test that nodes include only definition-derived class metadata."""
|
||||
flow = SimpleFlow()
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
for node_name, node_metadata in structure["nodes"].items():
|
||||
for node_metadata in structure["nodes"].values():
|
||||
assert node_metadata["class_name"] is not None
|
||||
assert node_metadata["class_name"] == "SimpleFlow"
|
||||
assert node_metadata["class_signature"] is not None
|
||||
assert "SimpleFlow" in node_metadata["class_signature"]
|
||||
assert "class_signature" not in node_metadata
|
||||
|
||||
|
||||
def test_visualization_plot_method():
|
||||
@@ -338,8 +377,8 @@ def test_visualization_plot_method():
|
||||
assert os.path.exists(html_file)
|
||||
|
||||
|
||||
def test_router_paths_to_string_conditions():
|
||||
"""Test that router paths correctly connect to listeners with string conditions."""
|
||||
def test_router_events_to_string_conditions():
|
||||
"""Test that router events correctly connect to listeners with string conditions."""
|
||||
|
||||
class RouterToStringFlow(Flow):
|
||||
@start()
|
||||
@@ -349,25 +388,34 @@ def test_router_paths_to_string_conditions():
|
||||
@router(init)
|
||||
def decide(self):
|
||||
if hasattr(self, "state") and self.state.get("path") == "b":
|
||||
return "path_b"
|
||||
return "path_a"
|
||||
return "event_b"
|
||||
return "event_a"
|
||||
|
||||
@listen(or_("path_a", "path_b"))
|
||||
@listen(or_("event_a", "event_b"))
|
||||
def handle_either(self):
|
||||
return "handled"
|
||||
|
||||
@listen("path_b")
|
||||
@listen("event_b")
|
||||
def handle_b_only(self):
|
||||
return "handled_b"
|
||||
|
||||
flow = RouterToStringFlow()
|
||||
_attach_flow_definition(
|
||||
RouterToStringFlow,
|
||||
{
|
||||
"init": {"start": True},
|
||||
"decide": {"listen": "init", "router": True, "emit": ["event_a", "event_b"]},
|
||||
"handle_either": {"listen": {"or": ["event_a", "event_b"]}},
|
||||
"handle_b_only": {"listen": "event_b"},
|
||||
},
|
||||
)
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
decide_node = structure["nodes"]["decide"]
|
||||
assert "path_a" in decide_node["router_paths"]
|
||||
assert "path_b" in decide_node["router_paths"]
|
||||
assert "event_a" in decide_node["router_events"]
|
||||
assert "event_b" in decide_node["router_events"]
|
||||
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_path"]]
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_event"]]
|
||||
|
||||
assert len(router_edges) == 3
|
||||
|
||||
@@ -382,8 +430,8 @@ def test_router_paths_to_string_conditions():
|
||||
assert len(edges_to_handle_b_only) == 1
|
||||
|
||||
|
||||
def test_router_paths_not_in_and_conditions():
|
||||
"""Test that router paths don't create edges to AND-nested conditions."""
|
||||
def test_router_events_not_in_and_conditions():
|
||||
"""Test that router events don't create edges to AND-nested conditions."""
|
||||
|
||||
class RouterAndConditionFlow(Flow):
|
||||
@start()
|
||||
@@ -392,24 +440,34 @@ def test_router_paths_not_in_and_conditions():
|
||||
|
||||
@router(init)
|
||||
def decide(self):
|
||||
return "path_a"
|
||||
return "event_a"
|
||||
|
||||
@listen("path_a")
|
||||
@listen("event_a")
|
||||
def step_1(self):
|
||||
return "step_1_done"
|
||||
|
||||
@listen(and_("path_a", step_1))
|
||||
@listen(and_("event_a", step_1))
|
||||
def step_2_and(self):
|
||||
return "step_2_done"
|
||||
|
||||
@listen(or_(and_("path_a", step_1), "path_a"))
|
||||
@listen(or_(and_("event_a", step_1), "event_a"))
|
||||
def step_3_or(self):
|
||||
return "step_3_done"
|
||||
|
||||
flow = RouterAndConditionFlow()
|
||||
_attach_flow_definition(
|
||||
RouterAndConditionFlow,
|
||||
{
|
||||
"init": {"start": True},
|
||||
"decide": {"listen": "init", "router": True, "emit": ["event_a"]},
|
||||
"step_1": {"listen": "event_a"},
|
||||
"step_2_and": {"listen": {"and": ["event_a", "step_1"]}},
|
||||
"step_3_or": {"listen": {"or": [{"and": ["event_a", "step_1"]}, "event_a"]}},
|
||||
},
|
||||
)
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_path"]]
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_event"]]
|
||||
|
||||
targets = [edge["target"] for edge in router_edges]
|
||||
|
||||
@@ -454,6 +512,17 @@ def test_chained_routers_no_self_loops():
|
||||
return "need_auth"
|
||||
|
||||
flow = ChainedRouterFlow()
|
||||
_attach_flow_definition(
|
||||
ChainedRouterFlow,
|
||||
{
|
||||
"entrance": {"start": True},
|
||||
"session_in_cache": {"listen": "entrance", "router": True, "emit": ["exp"]},
|
||||
"check_exp": {"listen": "exp", "router": True, "emit": ["auth"]},
|
||||
"call_ai_auth": {"listen": "auth", "router": True, "emit": ["action"]},
|
||||
"forward_to_action": {"listen": "action"},
|
||||
"forward_to_authenticate": {"listen": "authenticate"},
|
||||
},
|
||||
)
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
for edge in structure["edges"]:
|
||||
@@ -461,13 +530,13 @@ def test_chained_routers_no_self_loops():
|
||||
f"Self-loop detected: {edge['source']} -> {edge['target']}"
|
||||
)
|
||||
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_path"]]
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_event"]]
|
||||
|
||||
# session_in_cache -> check_exp (via 'exp')
|
||||
exp_edges = [
|
||||
edge
|
||||
for edge in router_edges
|
||||
if edge["router_path_label"] == "exp" and edge["source"] == "session_in_cache"
|
||||
if edge["router_event"] == "exp" and edge["source"] == "session_in_cache"
|
||||
]
|
||||
assert len(exp_edges) == 1
|
||||
assert exp_edges[0]["target"] == "check_exp"
|
||||
@@ -476,7 +545,7 @@ def test_chained_routers_no_self_loops():
|
||||
auth_edges = [
|
||||
edge
|
||||
for edge in router_edges
|
||||
if edge["router_path_label"] == "auth" and edge["source"] == "check_exp"
|
||||
if edge["router_event"] == "auth" and edge["source"] == "check_exp"
|
||||
]
|
||||
assert len(auth_edges) == 1
|
||||
assert auth_edges[0]["target"] == "call_ai_auth"
|
||||
@@ -485,7 +554,7 @@ def test_chained_routers_no_self_loops():
|
||||
action_edges = [
|
||||
edge
|
||||
for edge in router_edges
|
||||
if edge["router_path_label"] == "action" and edge["source"] == "call_ai_auth"
|
||||
if edge["router_event"] == "action" and edge["source"] == "call_ai_auth"
|
||||
]
|
||||
assert len(action_edges) == 1
|
||||
assert action_edges[0]["target"] == "forward_to_action"
|
||||
@@ -523,6 +592,16 @@ def test_routers_with_shared_output_strings():
|
||||
return "skipped"
|
||||
|
||||
flow = SharedOutputRouterFlow()
|
||||
_attach_flow_definition(
|
||||
SharedOutputRouterFlow,
|
||||
{
|
||||
"start": {"start": True},
|
||||
"router_a": {"listen": "start", "router": True, "emit": ["auth"]},
|
||||
"router_b": {"listen": "auth", "router": True, "emit": ["done"]},
|
||||
"finalize": {"listen": "done"},
|
||||
"handle_skip": {"listen": "skip"},
|
||||
},
|
||||
)
|
||||
structure = build_flow_structure(flow)
|
||||
|
||||
for edge in structure["edges"]:
|
||||
@@ -531,11 +610,11 @@ def test_routers_with_shared_output_strings():
|
||||
)
|
||||
|
||||
# router_a should connect to router_b via 'auth'
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_path"]]
|
||||
router_edges = [edge for edge in structure["edges"] if edge["is_router_event"]]
|
||||
auth_from_a = [
|
||||
edge
|
||||
for edge in router_edges
|
||||
if edge["source"] == "router_a" and edge["router_path_label"] == "auth"
|
||||
if edge["source"] == "router_a" and edge["router_event"] == "auth"
|
||||
]
|
||||
assert len(auth_from_a) == 1
|
||||
assert auth_from_a[0]["target"] == "router_b"
|
||||
@@ -544,17 +623,17 @@ def test_routers_with_shared_output_strings():
|
||||
done_from_b = [
|
||||
edge
|
||||
for edge in router_edges
|
||||
if edge["source"] == "router_b" and edge["router_path_label"] == "done"
|
||||
if edge["source"] == "router_b" and edge["router_event"] == "done"
|
||||
]
|
||||
assert len(done_from_b) == 1
|
||||
assert done_from_b[0]["target"] == "finalize"
|
||||
|
||||
|
||||
def test_warning_for_router_without_paths(caplog):
|
||||
"""Test that a warning is logged when a router has no determinable paths."""
|
||||
def test_warning_for_router_without_events(caplog):
|
||||
"""Test that a warning is logged when a router has no determinable events."""
|
||||
import logging
|
||||
|
||||
class RouterWithoutPathsFlow(Flow):
|
||||
class RouterWithoutEventsFlow(Flow):
|
||||
"""Flow with a router that returns a dynamic value."""
|
||||
|
||||
@start()
|
||||
@@ -564,34 +643,35 @@ def test_warning_for_router_without_paths(caplog):
|
||||
@router(begin)
|
||||
def dynamic_router(self):
|
||||
import random
|
||||
return random.choice(["path_a", "path_b"])
|
||||
return random.choice(["event_a", "event_b"])
|
||||
|
||||
@listen("path_a")
|
||||
@listen("event_a")
|
||||
def handle_a(self):
|
||||
return "a"
|
||||
|
||||
@listen("path_b")
|
||||
@listen("event_b")
|
||||
def handle_b(self):
|
||||
return "b"
|
||||
|
||||
flow = RouterWithoutPathsFlow()
|
||||
flow = RouterWithoutEventsFlow()
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
build_flow_structure(flow)
|
||||
|
||||
assert any(
|
||||
"Could not determine return paths for router 'dynamic_router'" in record.message
|
||||
"Router events for 'dynamic_router' are dynamic" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
|
||||
assert any(
|
||||
"Found listeners waiting for triggers" in record.message
|
||||
"Static visualization could not match listener triggers" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
assert not any(record.levelno >= logging.ERROR for record in caplog.records)
|
||||
|
||||
|
||||
def test_warning_for_orphaned_listeners(caplog):
|
||||
"""Test that an error is logged when listeners wait for triggers no router outputs."""
|
||||
"""Test that a warning is logged when a trigger has no explicit router output."""
|
||||
import logging
|
||||
from typing import Literal
|
||||
|
||||
@@ -615,19 +695,33 @@ def test_warning_for_orphaned_listeners(caplog):
|
||||
return "orphan"
|
||||
|
||||
flow = OrphanedListenerFlow()
|
||||
_attach_flow_definition(
|
||||
OrphanedListenerFlow,
|
||||
{
|
||||
"begin": {"start": True},
|
||||
"my_router": {
|
||||
"listen": "begin",
|
||||
"router": True,
|
||||
"emit": ["option_a", "option_b"],
|
||||
},
|
||||
"handle_a": {"listen": "option_a"},
|
||||
"handle_orphan": {"listen": "option_c"},
|
||||
},
|
||||
)
|
||||
|
||||
with caplog.at_level(logging.ERROR):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
build_flow_structure(flow)
|
||||
|
||||
assert any(
|
||||
"Found listeners waiting for triggers" in record.message
|
||||
"Static visualization could not match listener triggers" in record.message
|
||||
and "option_c" in record.message
|
||||
for record in caplog.records
|
||||
)
|
||||
assert not any(record.levelno >= logging.ERROR for record in caplog.records)
|
||||
|
||||
|
||||
def test_no_warning_for_properly_typed_router(caplog):
|
||||
"""Test that no warning is logged when router has proper type annotations."""
|
||||
def test_no_warning_for_explicit_contract_router_events(caplog):
|
||||
"""Test no warning is logged when router events are declared in the contract."""
|
||||
import logging
|
||||
from typing import Literal
|
||||
|
||||
@@ -639,23 +733,39 @@ def test_no_warning_for_properly_typed_router(caplog):
|
||||
return "started"
|
||||
|
||||
@router(begin)
|
||||
def typed_router(self) -> Literal["path_a", "path_b"]:
|
||||
return "path_a"
|
||||
def typed_router(self) -> Literal["event_a", "event_b"]:
|
||||
return "event_a"
|
||||
|
||||
@listen("path_a")
|
||||
@listen("event_a")
|
||||
def handle_a(self):
|
||||
return "a"
|
||||
|
||||
@listen("path_b")
|
||||
@listen("event_b")
|
||||
def handle_b(self):
|
||||
return "b"
|
||||
|
||||
flow = ProperlyTypedRouterFlow()
|
||||
_attach_flow_definition(
|
||||
ProperlyTypedRouterFlow,
|
||||
{
|
||||
"begin": {"start": True},
|
||||
"typed_router": {
|
||||
"listen": "begin",
|
||||
"router": True,
|
||||
"emit": ["event_a", "event_b"],
|
||||
},
|
||||
"handle_a": {"listen": "event_a"},
|
||||
"handle_b": {"listen": "event_b"},
|
||||
},
|
||||
)
|
||||
|
||||
with caplog.at_level(logging.WARNING):
|
||||
build_flow_structure(flow)
|
||||
|
||||
# No warnings should be logged
|
||||
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
|
||||
assert not any("Could not determine return paths" in msg for msg in warning_messages)
|
||||
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
|
||||
assert not any("Router events for" in msg for msg in warning_messages)
|
||||
assert not any(
|
||||
"Static visualization could not match listener triggers" in msg
|
||||
for msg in warning_messages
|
||||
)
|
||||
|
||||
@@ -13,7 +13,7 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.flow import Flow, human_feedback, listen, start
|
||||
from crewai.flow import Flow, human_feedback, listen, persist, start
|
||||
from crewai.flow.human_feedback import (
|
||||
HumanFeedbackConfig,
|
||||
HumanFeedbackResult,
|
||||
@@ -79,7 +79,7 @@ class TestHumanFeedbackValidation:
|
||||
|
||||
assert hasattr(test_method, "__human_feedback_config__")
|
||||
assert test_method.__is_router__ is True
|
||||
assert test_method.__router_paths__ == ["approve", "reject"]
|
||||
assert test_method.__router_emit__ == ["approve", "reject"]
|
||||
|
||||
def test_valid_configuration_without_routing(self):
|
||||
"""Test that valid configuration without routing doesn't raise."""
|
||||
@@ -91,6 +91,22 @@ class TestHumanFeedbackValidation:
|
||||
assert hasattr(test_method, "__human_feedback_config__")
|
||||
assert not hasattr(test_method, "__is_router__") or not test_method.__is_router__
|
||||
|
||||
def test_persist_preserves_human_feedback_llm_attribute(self):
|
||||
"""Test @persist preserves the live LLM stashed by @human_feedback."""
|
||||
llm = object()
|
||||
|
||||
@persist()
|
||||
@human_feedback(
|
||||
message="Review this:",
|
||||
emit=["approve", "reject"],
|
||||
llm=llm,
|
||||
)
|
||||
def test_method(self):
|
||||
return "output"
|
||||
|
||||
assert hasattr(test_method, "_human_feedback_llm")
|
||||
assert test_method._human_feedback_llm is llm
|
||||
|
||||
|
||||
class TestHumanFeedbackConfig:
|
||||
"""Tests for HumanFeedbackConfig dataclass."""
|
||||
@@ -189,7 +205,7 @@ class TestDecoratorAttributePreservation:
|
||||
return "output"
|
||||
|
||||
assert review_method.__is_router__ is True
|
||||
assert review_method.__router_paths__ == ["approved", "rejected"]
|
||||
assert review_method.__router_emit__ == ["approved", "rejected"]
|
||||
|
||||
|
||||
class TestAsyncSupport:
|
||||
|
||||
@@ -778,14 +778,14 @@ class TestEdgeCases:
|
||||
class TestLLMConfigPreservation:
|
||||
"""Tests that LLM config is preserved through @human_feedback serialization.
|
||||
|
||||
PR #4970 introduced _hf_llm stashing so the live LLM object survives
|
||||
PR #4970 introduced _human_feedback_llm stashing so the live LLM object survives
|
||||
decorator wrapping for same-process resume. The serialization path
|
||||
(_serialize_llm_for_context / _deserialize_llm_from_context) preserves
|
||||
config for cross-process resume.
|
||||
"""
|
||||
|
||||
def test_hf_llm_stashed_on_wrapper_with_llm_instance(self):
|
||||
"""Test that passing an LLM instance stashes it on the wrapper as _hf_llm."""
|
||||
def test_human_feedback_llm_stashed_on_wrapper_with_llm_instance(self):
|
||||
"""Test that passing an LLM instance stashes it on the wrapper as _human_feedback_llm."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
|
||||
@@ -801,11 +801,11 @@ class TestLLMConfigPreservation:
|
||||
return "content"
|
||||
|
||||
method = ConfigFlow.review
|
||||
assert hasattr(method, "_hf_llm"), "_hf_llm not found on wrapper"
|
||||
assert method._hf_llm is llm_instance, "_hf_llm is not the same object"
|
||||
assert hasattr(method, "_human_feedback_llm"), "_human_feedback_llm not found on wrapper"
|
||||
assert method._human_feedback_llm is llm_instance, "_human_feedback_llm is not the same object"
|
||||
|
||||
def test_hf_llm_preserved_on_listen_method(self):
|
||||
"""Test that _hf_llm is preserved when @human_feedback is on a @listen method."""
|
||||
def test_human_feedback_llm_preserved_on_listen_method(self):
|
||||
"""Test that _human_feedback_llm is preserved when @human_feedback is on a @listen method."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.7)
|
||||
@@ -825,11 +825,11 @@ class TestLLMConfigPreservation:
|
||||
return "content"
|
||||
|
||||
method = ListenConfigFlow.review
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm is llm_instance
|
||||
assert hasattr(method, "_human_feedback_llm")
|
||||
assert method._human_feedback_llm is llm_instance
|
||||
|
||||
def test_hf_llm_accessible_on_instance(self):
|
||||
"""Test that _hf_llm survives Flow instantiation (bound method access)."""
|
||||
def test_human_feedback_llm_accessible_on_instance(self):
|
||||
"""Test that _human_feedback_llm survives Flow instantiation (bound method access)."""
|
||||
from crewai.llm import LLM
|
||||
|
||||
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
|
||||
@@ -846,8 +846,8 @@ class TestLLMConfigPreservation:
|
||||
|
||||
flow = InstanceFlow()
|
||||
instance_method = flow.review
|
||||
assert hasattr(instance_method, "_hf_llm")
|
||||
assert instance_method._hf_llm is llm_instance
|
||||
assert hasattr(instance_method, "_human_feedback_llm")
|
||||
assert instance_method._human_feedback_llm is llm_instance
|
||||
|
||||
def test_serialize_llm_preserves_config_fields(self):
|
||||
"""Test that _serialize_llm_for_context captures temperature, base_url, etc."""
|
||||
|
||||
Reference in New Issue
Block a user