mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-08 12:08:15 +00:00
When a method has both @listen and @human_feedback(emit=[...]), the FlowMeta metaclass registered it as a router but only used get_possible_return_constants() to detect paths. This fails for @human_feedback methods since the paths come from the decorator's emit param, not from return statements in the source code. Now checks __router_paths__ first (set by @human_feedback), then falls back to source code analysis for plain @router methods. This was causing missing edges in the flow serializer output — e.g. the whitepaper generator's review_infographic -> handle_cancelled, send_slack_notification, classify_feedback edges were all missing. Adds test: @listen + @human_feedback(emit=[...]) generates correct router edges in serialized output. Co-authored-by: Joao Moura <joao@crewai.com>
848 lines
26 KiB
Python
848 lines
26 KiB
Python
"""Tests for flow_serializer.py - Flow structure serialization for Studio UI."""
|
|
|
|
from typing import Literal
|
|
|
|
import pytest
|
|
from pydantic import BaseModel, Field
|
|
|
|
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
|
from crewai.flow.flow_serializer import flow_structure
|
|
from crewai.flow.human_feedback import human_feedback
|
|
|
|
|
|
class TestSimpleLinearFlow:
|
|
"""Test simple linear flow (start → listen → listen)."""
|
|
|
|
def test_linear_flow_structure(self):
|
|
"""Test a simple sequential flow structure."""
|
|
|
|
class LinearFlow(Flow):
|
|
"""A simple linear flow for testing."""
|
|
|
|
@start()
|
|
def begin(self):
|
|
return "started"
|
|
|
|
@listen(begin)
|
|
def process(self):
|
|
return "processed"
|
|
|
|
@listen(process)
|
|
def finalize(self):
|
|
return "done"
|
|
|
|
structure = flow_structure(LinearFlow)
|
|
|
|
assert structure["name"] == "LinearFlow"
|
|
assert structure["description"] == "A simple linear flow for testing."
|
|
assert len(structure["methods"]) == 3
|
|
|
|
# Check method types
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
assert method_map["begin"]["type"] == "start"
|
|
assert method_map["process"]["type"] == "listen"
|
|
assert method_map["finalize"]["type"] == "listen"
|
|
|
|
# Check edges
|
|
assert len(structure["edges"]) == 2
|
|
|
|
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
|
|
assert ("begin", "process") in edge_pairs
|
|
assert ("process", "finalize") in edge_pairs
|
|
|
|
# All edges should be listen type
|
|
for edge in structure["edges"]:
|
|
assert edge["edge_type"] == "listen"
|
|
assert edge["condition"] is None
|
|
|
|
|
|
class TestRouterFlow:
|
|
"""Test flow with router branching."""
|
|
|
|
def test_router_flow_structure(self):
|
|
"""Test a flow with router that branches to different paths."""
|
|
|
|
class BranchingFlow(Flow):
|
|
@start()
|
|
def init(self):
|
|
return "initialized"
|
|
|
|
@router(init)
|
|
def decide(self) -> Literal["path_a", "path_b"]:
|
|
return "path_a"
|
|
|
|
@listen("path_a")
|
|
def handle_a(self):
|
|
return "handled_a"
|
|
|
|
@listen("path_b")
|
|
def handle_b(self):
|
|
return "handled_b"
|
|
|
|
structure = flow_structure(BranchingFlow)
|
|
|
|
assert structure["name"] == "BranchingFlow"
|
|
assert len(structure["methods"]) == 4
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
# Check method types
|
|
assert method_map["init"]["type"] == "start"
|
|
assert method_map["decide"]["type"] == "router"
|
|
assert method_map["handle_a"]["type"] == "listen"
|
|
assert method_map["handle_b"]["type"] == "listen"
|
|
|
|
# Check router paths
|
|
assert "path_a" in method_map["decide"]["router_paths"]
|
|
assert "path_b" in method_map["decide"]["router_paths"]
|
|
|
|
# Check edges
|
|
# Should have: init -> decide (listen), decide -> handle_a (route), decide -> handle_b (route)
|
|
listen_edges = [e for e in structure["edges"] if e["edge_type"] == "listen"]
|
|
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
|
|
|
|
assert len(listen_edges) == 1
|
|
assert listen_edges[0]["from_method"] == "init"
|
|
assert listen_edges[0]["to_method"] == "decide"
|
|
|
|
assert len(route_edges) == 2
|
|
route_targets = {e["to_method"] for e in route_edges}
|
|
assert "handle_a" in route_targets
|
|
assert "handle_b" in route_targets
|
|
|
|
# Check route conditions
|
|
route_conditions = {e["to_method"]: e["condition"] for e in route_edges}
|
|
assert route_conditions["handle_a"] == "path_a"
|
|
assert route_conditions["handle_b"] == "path_b"
|
|
|
|
|
|
class TestAndOrConditions:
|
|
"""Test flow with AND/OR conditions."""
|
|
|
|
def test_and_condition_flow(self):
|
|
"""Test a flow where a method waits for multiple methods (AND)."""
|
|
|
|
class AndConditionFlow(Flow):
|
|
@start()
|
|
def step_a(self):
|
|
return "a"
|
|
|
|
@start()
|
|
def step_b(self):
|
|
return "b"
|
|
|
|
@listen(and_(step_a, step_b))
|
|
def converge(self):
|
|
return "converged"
|
|
|
|
structure = flow_structure(AndConditionFlow)
|
|
|
|
assert len(structure["methods"]) == 3
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
assert method_map["step_a"]["type"] == "start"
|
|
assert method_map["step_b"]["type"] == "start"
|
|
assert method_map["converge"]["type"] == "listen"
|
|
|
|
# Check condition type
|
|
assert method_map["converge"]["condition_type"] == "AND"
|
|
|
|
# Check trigger methods
|
|
triggers = method_map["converge"]["trigger_methods"]
|
|
assert "step_a" in triggers
|
|
assert "step_b" in triggers
|
|
|
|
# Check edges - should have 2 edges to converge
|
|
converge_edges = [e for e in structure["edges"] if e["to_method"] == "converge"]
|
|
assert len(converge_edges) == 2
|
|
|
|
def test_or_condition_flow(self):
|
|
"""Test a flow where a method is triggered by any of multiple methods (OR)."""
|
|
|
|
class OrConditionFlow(Flow):
|
|
@start()
|
|
def path_1(self):
|
|
return "1"
|
|
|
|
@start()
|
|
def path_2(self):
|
|
return "2"
|
|
|
|
@listen(or_(path_1, path_2))
|
|
def handle_any(self):
|
|
return "handled"
|
|
|
|
structure = flow_structure(OrConditionFlow)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
assert method_map["handle_any"]["condition_type"] == "OR"
|
|
|
|
triggers = method_map["handle_any"]["trigger_methods"]
|
|
assert "path_1" in triggers
|
|
assert "path_2" in triggers
|
|
|
|
|
|
class TestHumanFeedbackMethods:
|
|
"""Test flow with @human_feedback decorated methods."""
|
|
|
|
def test_human_feedback_detection(self):
|
|
"""Test that human feedback methods are correctly identified."""
|
|
|
|
class HumanFeedbackFlow(Flow):
|
|
@start()
|
|
@human_feedback(
|
|
message="Please review:",
|
|
emit=["approved", "rejected"],
|
|
llm="gpt-4o-mini",
|
|
)
|
|
def review_step(self):
|
|
return "content to review"
|
|
|
|
@listen("approved")
|
|
def handle_approved(self):
|
|
return "approved"
|
|
|
|
@listen("rejected")
|
|
def handle_rejected(self):
|
|
return "rejected"
|
|
|
|
structure = flow_structure(HumanFeedbackFlow)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
# review_step should have human feedback
|
|
assert method_map["review_step"]["has_human_feedback"] is True
|
|
# It's a start+router (due to emit)
|
|
assert method_map["review_step"]["type"] == "start_router"
|
|
assert "approved" in method_map["review_step"]["router_paths"]
|
|
assert "rejected" in method_map["review_step"]["router_paths"]
|
|
|
|
# Other methods should not have human feedback
|
|
assert method_map["handle_approved"]["has_human_feedback"] is False
|
|
assert method_map["handle_rejected"]["has_human_feedback"] is False
|
|
|
|
def test_listen_plus_human_feedback_router_edges(self):
|
|
"""Test that @listen + @human_feedback(emit=...) generates router edges.
|
|
|
|
This is the pattern used in the whitepaper generator:
|
|
a listener method that also acts as a router via @human_feedback(emit=[...]).
|
|
The serializer must generate edges from this method to listeners of its emit paths.
|
|
"""
|
|
|
|
class ReviewFlow(Flow):
|
|
@start()
|
|
def generate(self):
|
|
return "content"
|
|
|
|
@listen(generate)
|
|
@human_feedback(
|
|
message="Review this:",
|
|
emit=["approved", "needs_changes", "cancelled"],
|
|
llm="gpt-4o-mini",
|
|
)
|
|
def review(self):
|
|
return "review result"
|
|
|
|
@listen("approved")
|
|
def handle_approved(self):
|
|
return "done"
|
|
|
|
@listen("needs_changes")
|
|
def handle_changes(self):
|
|
return "regenerating"
|
|
|
|
@listen("cancelled")
|
|
def handle_cancelled(self):
|
|
return "cancelled"
|
|
|
|
structure = flow_structure(ReviewFlow)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
edge_set = {(e["from_method"], e["to_method"], e.get("condition")) for e in structure["edges"]}
|
|
|
|
# review should be detected as a router with the emit paths
|
|
assert method_map["review"]["type"] == "router"
|
|
assert set(method_map["review"]["router_paths"]) == {"approved", "needs_changes", "cancelled"}
|
|
assert method_map["review"]["has_human_feedback"] is True
|
|
|
|
# Should have listen edge: generate -> review
|
|
assert ("generate", "review", None) in edge_set
|
|
|
|
# Should have route edges from review to each listener
|
|
assert ("review", "handle_approved", "approved") in edge_set
|
|
assert ("review", "handle_changes", "needs_changes") in edge_set
|
|
assert ("review", "handle_cancelled", "cancelled") in edge_set
|
|
|
|
|
|
class TestCrewReferences:
|
|
"""Test detection of Crew references in method bodies."""
|
|
|
|
def test_crew_detection_with_crew_call(self):
|
|
"""Test that .crew() calls are detected."""
|
|
|
|
class FlowWithCrew(Flow):
|
|
@start()
|
|
def run_crew(self):
|
|
# Simulating crew usage pattern
|
|
# result = MyCrew().crew().kickoff()
|
|
return "result"
|
|
|
|
@listen(run_crew)
|
|
def no_crew(self):
|
|
return "done"
|
|
|
|
structure = flow_structure(FlowWithCrew)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
# Note: Since the actual .crew() call is in a comment/string,
|
|
# the detection might not trigger. In real code it would.
|
|
# We're testing the mechanism exists.
|
|
assert "has_crew" in method_map["run_crew"]
|
|
assert "has_crew" in method_map["no_crew"]
|
|
|
|
def test_no_crew_when_absent(self):
|
|
"""Test that methods without Crew refs return has_crew=False."""
|
|
|
|
class SimpleNonCrewFlow(Flow):
|
|
@start()
|
|
def calculate(self):
|
|
return 1 + 1
|
|
|
|
@listen(calculate)
|
|
def display(self):
|
|
return "result"
|
|
|
|
structure = flow_structure(SimpleNonCrewFlow)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
assert method_map["calculate"]["has_crew"] is False
|
|
assert method_map["display"]["has_crew"] is False
|
|
|
|
|
|
class TestTypedStateSchema:
|
|
"""Test flow with typed Pydantic state."""
|
|
|
|
def test_pydantic_state_schema_extraction(self):
|
|
"""Test extracting state schema from a Flow with Pydantic state."""
|
|
|
|
class MyState(BaseModel):
|
|
counter: int = 0
|
|
message: str = ""
|
|
items: list[str] = Field(default_factory=list)
|
|
|
|
class TypedStateFlow(Flow[MyState]):
|
|
initial_state = MyState
|
|
|
|
@start()
|
|
def increment(self):
|
|
self.state.counter += 1
|
|
return self.state.counter
|
|
|
|
@listen(increment)
|
|
def display(self):
|
|
return f"Count: {self.state.counter}"
|
|
|
|
structure = flow_structure(TypedStateFlow)
|
|
|
|
assert structure["state_schema"] is not None
|
|
fields = structure["state_schema"]["fields"]
|
|
|
|
field_names = {f["name"] for f in fields}
|
|
assert "counter" in field_names
|
|
assert "message" in field_names
|
|
assert "items" in field_names
|
|
|
|
# Check types
|
|
field_map = {f["name"]: f for f in fields}
|
|
assert "int" in field_map["counter"]["type"]
|
|
assert "str" in field_map["message"]["type"]
|
|
|
|
# Check defaults
|
|
assert field_map["counter"]["default"] == 0
|
|
assert field_map["message"]["default"] == ""
|
|
|
|
def test_dict_state_returns_none(self):
|
|
"""Test that flows using dict state return None for state_schema."""
|
|
|
|
class DictStateFlow(Flow):
|
|
@start()
|
|
def begin(self):
|
|
self.state["count"] = 1
|
|
return "started"
|
|
|
|
structure = flow_structure(DictStateFlow)
|
|
|
|
assert structure["state_schema"] is None
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Test edge cases and special scenarios."""
|
|
|
|
def test_start_router_combo(self):
|
|
"""Test a method that is both @start and a router (via human_feedback emit)."""
|
|
|
|
class StartRouterFlow(Flow):
|
|
@start()
|
|
@human_feedback(
|
|
message="Review:",
|
|
emit=["continue", "stop"],
|
|
llm="gpt-4o-mini",
|
|
)
|
|
def entry_point(self):
|
|
return "data"
|
|
|
|
@listen("continue")
|
|
def proceed(self):
|
|
return "proceeding"
|
|
|
|
@listen("stop")
|
|
def halt(self):
|
|
return "halted"
|
|
|
|
structure = flow_structure(StartRouterFlow)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
assert method_map["entry_point"]["type"] == "start_router"
|
|
assert method_map["entry_point"]["has_human_feedback"] is True
|
|
assert "continue" in method_map["entry_point"]["router_paths"]
|
|
assert "stop" in method_map["entry_point"]["router_paths"]
|
|
|
|
def test_multiple_start_methods(self):
|
|
"""Test a flow with multiple start methods."""
|
|
|
|
class MultiStartFlow(Flow):
|
|
@start()
|
|
def start_a(self):
|
|
return "a"
|
|
|
|
@start()
|
|
def start_b(self):
|
|
return "b"
|
|
|
|
@listen(and_(start_a, start_b))
|
|
def combine(self):
|
|
return "combined"
|
|
|
|
structure = flow_structure(MultiStartFlow)
|
|
|
|
start_methods = [m for m in structure["methods"] if m["type"] == "start"]
|
|
assert len(start_methods) == 2
|
|
|
|
start_names = {m["name"] for m in start_methods}
|
|
assert "start_a" in start_names
|
|
assert "start_b" in start_names
|
|
|
|
def test_orphan_methods(self):
|
|
"""Test that orphan methods (not connected to flow) are still captured."""
|
|
|
|
class FlowWithOrphan(Flow):
|
|
@start()
|
|
def begin(self):
|
|
return "started"
|
|
|
|
@listen(begin)
|
|
def connected(self):
|
|
return "connected"
|
|
|
|
@listen("never_triggered")
|
|
def orphan(self):
|
|
return "orphan"
|
|
|
|
structure = flow_structure(FlowWithOrphan)
|
|
|
|
method_names = {m["name"] for m in structure["methods"]}
|
|
assert "orphan" in method_names
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
assert method_map["orphan"]["trigger_methods"] == ["never_triggered"]
|
|
|
|
def test_empty_flow(self):
|
|
"""Test building structure for a flow with no methods."""
|
|
|
|
class EmptyFlow(Flow):
|
|
pass
|
|
|
|
structure = flow_structure(EmptyFlow)
|
|
|
|
assert structure["name"] == "EmptyFlow"
|
|
assert structure["methods"] == []
|
|
assert structure["edges"] == []
|
|
assert structure["state_schema"] is None
|
|
|
|
def test_flow_with_docstring(self):
|
|
"""Test that flow docstring is captured."""
|
|
|
|
class DocumentedFlow(Flow):
|
|
"""This is a well-documented flow.
|
|
|
|
It has multiple lines of documentation.
|
|
"""
|
|
|
|
@start()
|
|
def begin(self):
|
|
return "started"
|
|
|
|
structure = flow_structure(DocumentedFlow)
|
|
|
|
assert structure["description"] is not None
|
|
assert "well-documented flow" in structure["description"]
|
|
|
|
def test_flow_without_docstring(self):
|
|
"""Test that missing docstring returns None."""
|
|
|
|
class UndocumentedFlow(Flow):
|
|
@start()
|
|
def begin(self):
|
|
return "started"
|
|
|
|
structure = flow_structure(UndocumentedFlow)
|
|
|
|
assert structure["description"] is None
|
|
|
|
def test_nested_conditions(self):
|
|
"""Test flow with nested AND/OR conditions."""
|
|
|
|
class NestedConditionFlow(Flow):
|
|
@start()
|
|
def a(self):
|
|
return "a"
|
|
|
|
@start()
|
|
def b(self):
|
|
return "b"
|
|
|
|
@start()
|
|
def c(self):
|
|
return "c"
|
|
|
|
@listen(or_(and_(a, b), c))
|
|
def complex_trigger(self):
|
|
return "triggered"
|
|
|
|
structure = flow_structure(NestedConditionFlow)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
# Should have triggers for a, b, and c
|
|
triggers = method_map["complex_trigger"]["trigger_methods"]
|
|
assert len(triggers) == 3
|
|
assert "a" in triggers
|
|
assert "b" in triggers
|
|
assert "c" in triggers
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Test error handling and validation."""
|
|
|
|
def test_instance_raises_type_error(self):
|
|
"""Test that passing an instance raises TypeError."""
|
|
|
|
class TestFlow(Flow):
|
|
@start()
|
|
def begin(self):
|
|
return "started"
|
|
|
|
flow_instance = TestFlow()
|
|
|
|
with pytest.raises(TypeError) as exc_info:
|
|
flow_structure(flow_instance)
|
|
|
|
assert "requires a Flow class, not an instance" in str(exc_info.value)
|
|
|
|
def test_non_class_raises_type_error(self):
|
|
"""Test that passing non-class raises TypeError."""
|
|
|
|
with pytest.raises(TypeError):
|
|
flow_structure("not a class")
|
|
|
|
with pytest.raises(TypeError):
|
|
flow_structure(123)
|
|
|
|
|
|
class TestEdgeGeneration:
|
|
"""Test edge generation in various scenarios."""
|
|
|
|
def test_all_edges_generated_correctly(self):
|
|
"""Verify all edges are correctly generated for a complex flow."""
|
|
|
|
class ComplexFlow(Flow):
|
|
@start()
|
|
def entry(self):
|
|
return "started"
|
|
|
|
@listen(entry)
|
|
def step_1(self):
|
|
return "step_1"
|
|
|
|
@router(step_1)
|
|
def branch(self) -> Literal["left", "right"]:
|
|
return "left"
|
|
|
|
@listen("left")
|
|
def left_path(self):
|
|
return "left_done"
|
|
|
|
@listen("right")
|
|
def right_path(self):
|
|
return "right_done"
|
|
|
|
@listen(or_(left_path, right_path))
|
|
def converge(self):
|
|
return "done"
|
|
|
|
structure = flow_structure(ComplexFlow)
|
|
|
|
# Build edge map for easier checking
|
|
edges = structure["edges"]
|
|
|
|
# Check listen edges
|
|
listen_edges = [(e["from_method"], e["to_method"]) for e in edges if e["edge_type"] == "listen"]
|
|
|
|
assert ("entry", "step_1") in listen_edges
|
|
assert ("step_1", "branch") in listen_edges
|
|
assert ("left_path", "converge") in listen_edges
|
|
assert ("right_path", "converge") in listen_edges
|
|
|
|
# Check route edges
|
|
route_edges = [(e["from_method"], e["to_method"], e["condition"]) for e in edges if e["edge_type"] == "route"]
|
|
|
|
assert ("branch", "left_path", "left") in route_edges
|
|
assert ("branch", "right_path", "right") in route_edges
|
|
|
|
def test_router_edge_conditions(self):
|
|
"""Test that router edge conditions are properly set."""
|
|
|
|
class RouterConditionFlow(Flow):
|
|
@start()
|
|
def begin(self):
|
|
return "start"
|
|
|
|
@router(begin)
|
|
def route(self) -> Literal["option_1", "option_2", "option_3"]:
|
|
return "option_1"
|
|
|
|
@listen("option_1")
|
|
def handle_1(self):
|
|
return "1"
|
|
|
|
@listen("option_2")
|
|
def handle_2(self):
|
|
return "2"
|
|
|
|
@listen("option_3")
|
|
def handle_3(self):
|
|
return "3"
|
|
|
|
structure = flow_structure(RouterConditionFlow)
|
|
|
|
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
|
|
|
|
# Should have 3 route edges
|
|
assert len(route_edges) == 3
|
|
|
|
conditions = {e["to_method"]: e["condition"] for e in route_edges}
|
|
assert conditions["handle_1"] == "option_1"
|
|
assert conditions["handle_2"] == "option_2"
|
|
assert conditions["handle_3"] == "option_3"
|
|
|
|
|
|
class TestMethodTypeClassification:
|
|
"""Test method type classification."""
|
|
|
|
def test_all_method_types(self):
|
|
"""Test classification of all method types."""
|
|
|
|
class AllTypesFlow(Flow):
|
|
@start()
|
|
def start_only(self):
|
|
return "start"
|
|
|
|
@listen(start_only)
|
|
def listen_only(self):
|
|
return "listen"
|
|
|
|
@router(listen_only)
|
|
def router_only(self) -> Literal["path"]:
|
|
return "path"
|
|
|
|
@listen("path")
|
|
def after_router(self):
|
|
return "after"
|
|
|
|
@start()
|
|
@human_feedback(
|
|
message="Review",
|
|
emit=["yes", "no"],
|
|
llm="gpt-4o-mini",
|
|
)
|
|
def start_and_router(self):
|
|
return "data"
|
|
|
|
structure = flow_structure(AllTypesFlow)
|
|
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
|
|
assert method_map["start_only"]["type"] == "start"
|
|
assert method_map["listen_only"]["type"] == "listen"
|
|
assert method_map["router_only"]["type"] == "router"
|
|
assert method_map["after_router"]["type"] == "listen"
|
|
assert method_map["start_and_router"]["type"] == "start_router"
|
|
|
|
|
|
class TestInputDetection:
|
|
"""Test flow input detection."""
|
|
|
|
def test_inputs_list_exists(self):
|
|
"""Test that inputs list is always present."""
|
|
|
|
class SimpleFlow(Flow):
|
|
@start()
|
|
def begin(self):
|
|
return "started"
|
|
|
|
structure = flow_structure(SimpleFlow)
|
|
|
|
assert "inputs" in structure
|
|
assert isinstance(structure["inputs"], list)
|
|
|
|
|
|
class TestJsonSerializable:
|
|
"""Test that output is JSON serializable."""
|
|
|
|
def test_structure_is_json_serializable(self):
|
|
"""Test that the entire structure can be JSON serialized."""
|
|
import json
|
|
|
|
class MyState(BaseModel):
|
|
value: int = 0
|
|
|
|
class SerializableFlow(Flow[MyState]):
|
|
"""Test flow for JSON serialization."""
|
|
|
|
initial_state = MyState
|
|
|
|
@start()
|
|
@human_feedback(
|
|
message="Review",
|
|
emit=["ok", "not_ok"],
|
|
llm="gpt-4o-mini",
|
|
)
|
|
def begin(self):
|
|
return "data"
|
|
|
|
@listen("ok")
|
|
def proceed(self):
|
|
return "done"
|
|
|
|
structure = flow_structure(SerializableFlow)
|
|
|
|
# Should not raise
|
|
json_str = json.dumps(structure)
|
|
assert json_str is not None
|
|
|
|
# Should round-trip
|
|
parsed = json.loads(json_str)
|
|
assert parsed["name"] == "SerializableFlow"
|
|
assert len(parsed["methods"]) > 0
|
|
|
|
|
|
class TestFlowInheritance:
|
|
"""Test flow inheritance scenarios."""
|
|
|
|
def test_child_flow_inherits_parent_methods(self):
|
|
"""Test that FlowB inheriting from FlowA includes methods from both.
|
|
|
|
Note: FlowMeta propagates methods but does NOT fully propagate the
|
|
_listeners registry from parent classes. This means edges defined
|
|
in the parent class (e.g., parent_start -> parent_process) may not
|
|
appear in the child's structure. This is a known FlowMeta limitation.
|
|
"""
|
|
|
|
class FlowA(Flow):
|
|
"""Parent flow with start method."""
|
|
|
|
@start()
|
|
def parent_start(self):
|
|
return "parent started"
|
|
|
|
@listen(parent_start)
|
|
def parent_process(self):
|
|
return "parent processed"
|
|
|
|
class FlowB(FlowA):
|
|
"""Child flow with additional methods."""
|
|
|
|
@listen(FlowA.parent_process)
|
|
def child_continue(self):
|
|
return "child continued"
|
|
|
|
@listen(child_continue)
|
|
def child_finalize(self):
|
|
return "child finalized"
|
|
|
|
structure = flow_structure(FlowB)
|
|
|
|
assert structure["name"] == "FlowB"
|
|
|
|
# Check all methods are present (from both parent and child)
|
|
method_names = {m["name"] for m in structure["methods"]}
|
|
assert "parent_start" in method_names
|
|
assert "parent_process" in method_names
|
|
assert "child_continue" in method_names
|
|
assert "child_finalize" in method_names
|
|
|
|
# Check method types
|
|
method_map = {m["name"]: m for m in structure["methods"]}
|
|
assert method_map["parent_start"]["type"] == "start"
|
|
assert method_map["parent_process"]["type"] == "listen"
|
|
assert method_map["child_continue"]["type"] == "listen"
|
|
assert method_map["child_finalize"]["type"] == "listen"
|
|
|
|
# Check edges defined in child class exist
|
|
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
|
|
assert ("parent_process", "child_continue") in edge_pairs
|
|
assert ("child_continue", "child_finalize") in edge_pairs
|
|
|
|
# KNOWN LIMITATION: Edges defined in parent class (parent_start -> parent_process)
|
|
# are NOT propagated to child's _listeners registry by FlowMeta.
|
|
# The edge (parent_start, parent_process) will NOT be in edge_pairs.
|
|
# This is a FlowMeta limitation, not a serializer bug.
|
|
|
|
def test_child_flow_can_override_parent_method(self):
|
|
"""Test that child can override parent methods."""
|
|
|
|
class BaseFlow(Flow):
|
|
@start()
|
|
def begin(self):
|
|
return "base begin"
|
|
|
|
@listen(begin)
|
|
def process(self):
|
|
return "base process"
|
|
|
|
class ExtendedFlow(BaseFlow):
|
|
@listen(BaseFlow.begin)
|
|
def process(self):
|
|
# Override parent's process method
|
|
return "extended process"
|
|
|
|
@listen(process)
|
|
def finalize(self):
|
|
return "extended finalize"
|
|
|
|
structure = flow_structure(ExtendedFlow)
|
|
|
|
method_names = {m["name"] for m in structure["methods"]}
|
|
assert "begin" in method_names
|
|
assert "process" in method_names
|
|
assert "finalize" in method_names
|
|
|
|
# Should have 3 methods total (not 4, since process is overridden)
|
|
assert len(structure["methods"]) == 3
|