-
- Router Paths
+
+ Router Events
- ${uniqueRouterPaths.map((p) => `- ${p}
`).join("")}
+ ${uniqueRouterEvents.map((eventName) => `- ${eventName}
`).join("")}
`;
@@ -1823,14 +1859,26 @@ class DrawerManager {
});
});
- const routerPathsTitle = this.elements.content.querySelector(
- ".router-paths-title[data-router-paths]",
+ const routerEventLinks = this.elements.content.querySelectorAll(
+ ".drawer-code-link[data-router-event]",
);
- if (routerPathsTitle) {
- routerPathsTitle.addEventListener("click", (e) => {
+ routerEventLinks.forEach((link) => {
+ link.addEventListener("click", (e) => {
e.preventDefault();
e.stopPropagation();
- this.triggeredByHighlighter.highlightAllRouterPaths();
+ const routerEvent = link.getAttribute("data-router-event");
+ this.triggeredByHighlighter.highlightRouterEvent(routerEvent);
+ });
+ });
+
+ const routerEventsTitle = this.elements.content.querySelector(
+ ".router-events-title[data-router-events]",
+ );
+ if (routerEventsTitle) {
+ routerEventsTitle.addEventListener("click", (e) => {
+ e.preventDefault();
+ e.stopPropagation();
+ this.triggeredByHighlighter.highlightAllRouterEvents();
});
}
}
diff --git a/lib/crewai/src/crewai/flow/visualization/builder.py b/lib/crewai/src/crewai/flow/visualization/builder.py
index e277c1bbc..987eaf760 100644
--- a/lib/crewai/src/crewai/flow/visualization/builder.py
+++ b/lib/crewai/src/crewai/flow/visualization/builder.py
@@ -1,131 +1,118 @@
-"""Flow structure builder for analyzing Flow execution."""
+"""Flow structure builder for definition-only Flow visualization."""
from __future__ import annotations
from collections import defaultdict
-import inspect
import logging
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
-from crewai.flow.flow_wrappers import FlowCondition
-from crewai.flow.types import FlowMethodName
-from crewai.flow.utils import (
- is_flow_condition_dict,
- is_simple_flow_condition,
+from crewai.flow.flow_definition import (
+ FlowDefinition,
+ FlowDefinitionCondition,
+ FlowMethodDefinition,
)
-from crewai.flow.visualization.schema import extract_method_signature
from crewai.flow.visualization.types import FlowStructure, NodeMetadata, StructureEdge
logger = logging.getLogger(__name__)
+__all__ = ["build_flow_structure", "calculate_execution_paths"]
+
if TYPE_CHECKING:
from crewai.flow.flow import Flow
+def _definition_condition_items(
+ condition: dict[str, Any],
+ key: str,
+) -> list[FlowDefinitionCondition]:
+ return cast(list[FlowDefinitionCondition], condition.get(key, []))
+
+
+def _definition_condition_parts(
+ condition: dict[str, Any],
+) -> tuple[str, list[FlowDefinitionCondition]]:
+ if "and" in condition:
+ return AND_CONDITION, _definition_condition_items(condition, "and")
+ return OR_CONDITION, _definition_condition_items(condition, "or")
+
+
+def _condition_type_from_definition(
+ condition: FlowDefinitionCondition | None,
+) -> str | None:
+ if isinstance(condition, dict):
+ if "and" in condition:
+ return AND_CONDITION
+ if "or" in condition:
+ return OR_CONDITION
+ if isinstance(condition, str):
+ return OR_CONDITION
+ return None
+
+
+def _runtime_condition_from_definition(
+ condition: FlowDefinitionCondition,
+) -> str | dict[str, Any]:
+ if isinstance(condition, str):
+ return condition
+ condition_type, conditions = _definition_condition_parts(condition)
+ return {
+ "type": condition_type,
+ "conditions": [_runtime_condition_from_definition(item) for item in conditions],
+ }
+
+
+def _method_trigger_condition(
+ method_definition: FlowMethodDefinition,
+) -> FlowDefinitionCondition | None:
+ if method_definition.listen is not None:
+ return method_definition.listen
+ if isinstance(method_definition.start, str | dict):
+ return method_definition.start
+ return None
+
+
+def _method_router_events(method_definition: FlowMethodDefinition) -> list[str]:
+ if method_definition.human_feedback and method_definition.human_feedback.emit:
+ return [str(event) for event in method_definition.human_feedback.emit]
+ if method_definition.emit:
+ return [str(event) for event in method_definition.emit]
+ return []
+
+
def _extract_direct_or_triggers(
- condition: str | dict[str, Any] | list[Any] | FlowCondition,
+ condition: FlowDefinitionCondition,
) -> list[str]:
- """Extract direct OR-level trigger strings from a condition.
-
- This function extracts strings that would directly trigger a listener,
- meaning they appear at the top level of an OR condition. Strings nested
- inside AND conditions are NOT considered direct triggers for router paths.
-
- For example:
- - or_("a", "b") -> ["a", "b"] (both are direct triggers)
- - and_("a", "b") -> [] (neither are direct triggers, both required)
- - or_(and_("a", "b"), "c") -> ["c"] (only "c" is a direct trigger)
-
- Args:
- condition: Can be a string, dict, or list.
-
- Returns:
- List of direct OR-level trigger strings.
- """
if isinstance(condition, str):
return [condition]
- if isinstance(condition, dict):
- cond_type = condition.get("type", OR_CONDITION)
- conditions_list = condition.get("conditions", [])
-
- if cond_type == OR_CONDITION:
- strings = []
- for sub_cond in conditions_list:
- strings.extend(_extract_direct_or_triggers(sub_cond))
- return strings
+ condition_type, conditions = _definition_condition_parts(condition)
+ if condition_type == AND_CONDITION:
return []
- if isinstance(condition, list):
- strings = []
- for item in condition:
- strings.extend(_extract_direct_or_triggers(item))
- return strings
- if callable(condition) and hasattr(condition, "__name__"):
- return [condition.__name__]
- return []
+ strings: list[str] = []
+ for sub_condition in conditions:
+ strings.extend(_extract_direct_or_triggers(sub_condition))
+ return strings
def _extract_all_trigger_names(
- condition: str | dict[str, Any] | list[Any] | FlowCondition,
+ condition: FlowDefinitionCondition,
) -> list[str]:
- """Extract ALL trigger names from a condition for display purposes.
-
- Unlike _extract_direct_or_triggers, this extracts ALL strings and method
- names from the entire condition tree, including those nested in AND conditions.
- This is used for displaying trigger information in the UI.
-
- For example:
- - or_("a", "b") -> ["a", "b"]
- - and_("a", "b") -> ["a", "b"]
- - or_(and_("a", method_6), method_4) -> ["a", "method_6", "method_4"]
-
- Args:
- condition: Can be a string, dict, or list.
-
- Returns:
- List of all trigger names found in the condition.
- """
if isinstance(condition, str):
return [condition]
- if isinstance(condition, dict):
- conditions_list = condition.get("conditions", [])
- strings = []
- for sub_cond in conditions_list:
- strings.extend(_extract_all_trigger_names(sub_cond))
- return strings
- if isinstance(condition, list):
- strings = []
- for item in condition:
- strings.extend(_extract_all_trigger_names(item))
- return strings
- if callable(condition) and hasattr(condition, "__name__"):
- return [condition.__name__]
- return []
+ _, conditions = _definition_condition_parts(condition)
+ strings: list[str] = []
+ for sub_condition in conditions:
+ strings.extend(_extract_all_trigger_names(sub_condition))
+ return strings
def _create_edges_from_condition(
- condition: str | dict[str, Any] | list[Any] | FlowCondition,
+ condition: FlowDefinitionCondition,
target: str,
nodes: dict[str, NodeMetadata],
) -> list[StructureEdge]:
- """Create edges from a condition tree, preserving AND/OR semantics.
-
- This function recursively processes the condition tree and creates edges
- with the appropriate condition_type for each trigger.
-
- For AND conditions, all triggers get edges with condition_type="AND".
- For OR conditions, triggers get edges with condition_type="OR".
-
- Args:
- condition: The condition tree (string, dict, or list).
- target: The target node name.
- nodes: Dictionary of all nodes for validation.
-
- Returns:
- List of StructureEdge objects representing the condition.
- """
edges: list[StructureEdge] = []
if isinstance(condition, str):
@@ -135,24 +122,11 @@ def _create_edges_from_condition(
source=condition,
target=target,
condition_type=OR_CONDITION,
- is_router_path=False,
- )
- )
- elif callable(condition) and hasattr(condition, "__name__"):
- method_name = condition.__name__
- if method_name in nodes:
- edges.append(
- StructureEdge(
- source=method_name,
- target=target,
- condition_type=OR_CONDITION,
- is_router_path=False,
+ is_router_event=False,
)
)
elif isinstance(condition, dict):
- cond_type = condition.get("type", OR_CONDITION)
- conditions_list = condition.get("conditions", [])
-
+ cond_type, conditions = _definition_condition_parts(condition)
if cond_type == AND_CONDITION:
triggers = _extract_all_trigger_names(condition)
edges.extend(
@@ -160,277 +134,144 @@ def _create_edges_from_condition(
source=trigger,
target=target,
condition_type=AND_CONDITION,
- is_router_path=False,
+ is_router_event=False,
)
for trigger in triggers
if trigger in nodes
)
else:
- for sub_cond in conditions_list:
- edges.extend(_create_edges_from_condition(sub_cond, target, nodes))
- elif isinstance(condition, list):
- for item in condition:
- edges.extend(_create_edges_from_condition(item, target, nodes))
+ for sub_condition in conditions:
+ edges.extend(_create_edges_from_condition(sub_condition, target, nodes))
return edges
-def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
- """Build a structure representation of a Flow's execution.
+def _flow_definition_from(
+ flow_or_definition: Flow[Any] | type[Flow[Any]] | FlowDefinition,
+) -> FlowDefinition:
+ if isinstance(flow_or_definition, FlowDefinition):
+ return flow_or_definition
- Args:
- flow: Flow instance to analyze.
+ flow_class = (
+ flow_or_definition
+ if isinstance(flow_or_definition, type)
+ else type(flow_or_definition)
+ )
+ flow_definition = getattr(flow_class, "flow_definition", None)
+ if callable(flow_definition):
+ return cast(FlowDefinition, flow_definition())
+ raise TypeError(
+ "build_flow_structure requires a FlowDefinition or a Flow class/instance "
+ "with flow_definition()."
+ )
- Returns:
- Dictionary with nodes, edges, start_methods, and router_methods.
- """
+
+def build_flow_structure(
+ flow_or_definition: Flow[Any] | type[Flow[Any]] | FlowDefinition,
+) -> FlowStructure:
+ """Build a visualization structure projection from a FlowDefinition."""
+ definition = _flow_definition_from(flow_or_definition)
nodes: dict[str, NodeMetadata] = {}
edges: list[StructureEdge] = []
start_methods: list[str] = []
router_methods: list[str] = []
- for method_name, method in flow._methods.items():
- node_metadata: NodeMetadata = {"type": "listen"}
+ for method_name, method_definition in definition.methods.items():
+ node_metadata: NodeMetadata = {"type": "listen", "class_name": definition.name}
- if hasattr(method, "__is_start_method__") and method.__is_start_method__:
+ if method_definition.is_start:
node_metadata["type"] = "start"
start_methods.append(method_name)
- if hasattr(method, "__is_router__") and method.__is_router__:
+ if method_definition.router:
node_metadata["is_router"] = True
node_metadata["type"] = "router"
router_methods.append(method_name)
+ router_events = _method_router_events(method_definition)
+ if router_events:
+ node_metadata["router_events"] = router_events
- if method_name in flow._router_paths:
- node_metadata["router_paths"] = [
- str(p) for p in flow._router_paths[method_name]
- ]
-
- if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
- node_metadata["trigger_methods"] = [
- str(m) for m in method.__trigger_methods__
- ]
-
- if hasattr(method, "__condition_type__") and method.__condition_type__:
- node_metadata["trigger_condition_type"] = method.__condition_type__
- if "condition_type" not in node_metadata:
- node_metadata["condition_type"] = method.__condition_type__
+ trigger_condition = _method_trigger_condition(method_definition)
+ condition_type = _condition_type_from_definition(trigger_condition)
+ if condition_type is not None and trigger_condition is not None:
+ node_metadata["trigger_condition_type"] = condition_type
+ node_metadata["condition_type"] = condition_type
+ extracted = _extract_all_trigger_names(trigger_condition)
+ if extracted:
+ node_metadata["trigger_methods"] = extracted
+ runtime_condition = _runtime_condition_from_definition(trigger_condition)
+ if isinstance(runtime_condition, dict):
+ node_metadata["trigger_condition"] = runtime_condition
if node_metadata.get("is_router") and "condition_type" not in node_metadata:
node_metadata["condition_type"] = "IF"
- if (
- hasattr(method, "__trigger_condition__")
- and method.__trigger_condition__ is not None
- ):
- node_metadata["trigger_condition"] = method.__trigger_condition__
-
- if "trigger_methods" not in node_metadata:
- extracted = _extract_all_trigger_names(method.__trigger_condition__)
- if extracted:
- node_metadata["trigger_methods"] = extracted
-
- node_metadata["method_signature"] = extract_method_signature(
- method, method_name
- )
-
- try:
- source_code = inspect.getsource(method)
- node_metadata["source_code"] = source_code
-
- try:
- source_lines, start_line = inspect.getsourcelines(method)
- node_metadata["source_lines"] = source_lines
- node_metadata["source_start_line"] = start_line
- except (OSError, TypeError):
- pass
-
- try:
- source_file = inspect.getsourcefile(method)
- if source_file:
- node_metadata["source_file"] = source_file
- except (OSError, TypeError):
- try:
- class_file = inspect.getsourcefile(flow.__class__)
- if class_file:
- node_metadata["source_file"] = class_file
- except (OSError, TypeError):
- pass
- except (OSError, TypeError):
- pass
-
- try:
- class_obj = flow.__class__
-
- if class_obj:
- class_name = class_obj.__name__
-
- bases = class_obj.__bases__
- if bases:
- base_strs = []
- for base in bases:
- if hasattr(base, "__name__"):
- if hasattr(base, "__origin__"):
- base_strs.append(str(base))
- else:
- base_strs.append(base.__name__)
- else:
- base_strs.append(str(base))
-
- try:
- source_lines = inspect.getsource(class_obj).split("\n")
- _, class_start_line = inspect.getsourcelines(class_obj)
-
- for idx, line in enumerate(source_lines):
- stripped = line.strip()
- if stripped.startswith("class ") and class_name in stripped:
- class_signature = stripped.rstrip(":")
- node_metadata["class_signature"] = class_signature
- node_metadata["class_line_number"] = (
- class_start_line + idx
- )
- break
- except (OSError, TypeError):
- class_signature = f"class {class_name}({', '.join(base_strs)})"
- node_metadata["class_signature"] = class_signature
- else:
- class_signature = f"class {class_name}"
- node_metadata["class_signature"] = class_signature
-
- node_metadata["class_name"] = class_name
- except (OSError, TypeError, AttributeError):
- pass
-
nodes[method_name] = node_metadata
- for listener_name, condition_data in flow._listeners.items():
- if listener_name in router_methods:
+ for method_name, method_definition in definition.methods.items():
+ trigger_condition = _method_trigger_condition(method_definition)
+ if trigger_condition is None:
continue
-
- if is_simple_flow_condition(condition_data):
- cond_type, methods = condition_data
- edges.extend(
- StructureEdge(
- source=str(trigger_method),
- target=str(listener_name),
- condition_type=cond_type,
- is_router_path=False,
- )
- for trigger_method in methods
- if str(trigger_method) in nodes
- )
- elif is_flow_condition_dict(condition_data):
- edges.extend(
- _create_edges_from_condition(condition_data, str(listener_name), nodes)
- )
-
- for method_name, node_metadata in nodes.items(): # type: ignore[assignment]
- if node_metadata.get("is_router") and "trigger_methods" in node_metadata:
- trigger_methods = node_metadata["trigger_methods"]
- condition_type = node_metadata.get("trigger_condition_type", OR_CONDITION)
-
- if "trigger_condition" in node_metadata:
- edges.extend(
- _create_edges_from_condition(
- node_metadata["trigger_condition"], # type: ignore[arg-type]
- method_name,
- nodes,
- )
- )
- else:
- edges.extend(
- StructureEdge(
- source=trigger_method,
- target=method_name,
- condition_type=condition_type,
- is_router_path=False,
- )
- for trigger_method in trigger_methods
- if trigger_method in nodes
- )
+ edges.extend(
+ _create_edges_from_condition(trigger_condition, method_name, nodes)
+ )
all_string_triggers: set[str] = set()
- for condition_data in flow._listeners.values():
- if is_simple_flow_condition(condition_data):
- _, methods = condition_data
- for m in methods:
- if str(m) not in nodes: # It's a string trigger, not a method name
- all_string_triggers.add(str(m))
- elif is_flow_condition_dict(condition_data):
- for trigger in _extract_direct_or_triggers(condition_data):
- if trigger not in nodes:
- all_string_triggers.add(trigger)
+ for method_definition in definition.methods.values():
+ trigger_condition = _method_trigger_condition(method_definition)
+ if trigger_condition is None:
+ continue
+ for trigger in _extract_direct_or_triggers(trigger_condition):
+ if trigger not in nodes:
+ all_string_triggers.add(trigger)
- all_router_outputs: set[str] = set()
+ all_router_events: set[str] = set()
for router_method_name in router_methods:
- if router_method_name not in flow._router_paths:
- flow._router_paths[FlowMethodName(router_method_name)] = []
+ router_events = _method_router_events(definition.methods[router_method_name])
+ if router_events and router_method_name in nodes:
+ nodes[router_method_name]["router_events"] = router_events
+ all_router_events.update(router_events)
- current_paths = flow._router_paths.get(FlowMethodName(router_method_name), [])
- if current_paths and router_method_name in nodes:
- nodes[router_method_name]["router_paths"] = [str(p) for p in current_paths]
- all_router_outputs.update(str(p) for p in current_paths)
-
- if not current_paths:
+ if not router_events:
logger.warning(
- f"Could not determine return paths for router '{router_method_name}'. "
- f"Add a return type annotation like "
- f"'-> Literal[\"path1\", \"path2\"]' or '-> YourEnum' "
- f"to enable proper flow visualization."
+ f"Router events for '{router_method_name}' are dynamic or not "
+ f"statically inferable; static visualization may omit event edges."
)
- orphaned_triggers = all_string_triggers - all_router_outputs
+ orphaned_triggers = all_string_triggers - all_router_events
if orphaned_triggers:
- logger.error(
- f"Found listeners waiting for triggers {orphaned_triggers} "
- f"but no router outputs these values explicitly. "
- f"If your router returns a non-static value, check that your router has proper return type annotations."
+ logger.warning(
+ f"Static visualization could not match listener triggers "
+ f"{orphaned_triggers} to explicit router events. "
+ f"Dynamic router values may still trigger these listeners at runtime."
)
for router_method_name in router_methods:
- if router_method_name not in flow._router_paths:
- continue
+ router_events = _method_router_events(definition.methods[router_method_name])
- router_paths = flow._router_paths[FlowMethodName(router_method_name)]
-
- for path in router_paths:
- for listener_name, condition_data in flow._listeners.items():
+ for event in router_events:
+ for listener_name, method_definition in definition.methods.items():
if listener_name == router_method_name:
continue
- trigger_strings_from_cond: list[str] = []
+ trigger_condition = _method_trigger_condition(method_definition)
+ if trigger_condition is None:
+ continue
+ trigger_strings_from_cond = _extract_direct_or_triggers(
+ trigger_condition
+ )
- if is_simple_flow_condition(condition_data):
- _, methods = condition_data
- trigger_strings_from_cond = [str(m) for m in methods]
- elif is_flow_condition_dict(condition_data):
- trigger_strings_from_cond = _extract_direct_or_triggers(
- condition_data
- )
-
- if str(path) in trigger_strings_from_cond:
+ if str(event) in trigger_strings_from_cond:
edges.append(
StructureEdge(
source=router_method_name,
- target=str(listener_name),
+ target=listener_name,
condition_type=None,
- is_router_path=True,
- router_path_label=str(path),
+ is_router_event=True,
+ router_event=str(event),
)
)
- for start_method in flow._start_methods:
- if start_method not in nodes and start_method in flow._methods:
- method = flow._methods[start_method]
- nodes[str(start_method)] = NodeMetadata(type="start")
-
- if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
- nodes[str(start_method)]["trigger_methods"] = [
- str(m) for m in method.__trigger_methods__
- ]
- if hasattr(method, "__condition_type__") and method.__condition_type__:
- nodes[str(start_method)]["condition_type"] = method.__condition_type__
-
return FlowStructure(
nodes=nodes,
edges=edges,
@@ -453,7 +294,7 @@ def calculate_execution_paths(structure: FlowStructure) -> int:
graph[edge["source"]].append(
{
"target": edge["target"],
- "is_router": edge["is_router_path"],
+ "is_router": edge["is_router_event"],
"condition": edge["condition_type"],
}
)
@@ -466,15 +307,6 @@ def calculate_execution_paths(structure: FlowStructure) -> int:
return 0
def count_paths_from(node: str, visited: set[str]) -> int:
- """Recursively count execution paths from a given node.
-
- Args:
- node: Node name to start counting from.
- visited: Set of already visited nodes to prevent cycles.
-
- Returns:
- Number of execution paths from this node to terminal nodes.
- """
if node in terminal_nodes:
return 1
diff --git a/lib/crewai/src/crewai/flow/visualization/renderers/interactive.py b/lib/crewai/src/crewai/flow/visualization/renderers/interactive.py
index 88242bea6..0ad8943f1 100644
--- a/lib/crewai/src/crewai/flow/visualization/renderers/interactive.py
+++ b/lib/crewai/src/crewai/flow/visualization/renderers/interactive.py
@@ -309,18 +309,18 @@ def render_interactive(
""")
- if metadata.get("router_paths"):
- paths = metadata["router_paths"]
- paths_items = "".join(
+ if metadata.get("router_events"):
+ router_events = metadata["router_events"]
+ event_items = "".join(
[
f'