From 629f5d537b6a3eaf534ea5f5a77331ebe8bfcbe7 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 30 Jun 2026 19:42:01 -0700 Subject: [PATCH] Reject self-listening flow methods (#6405) A method that listens to its own name can re-trigger itself or collide with router events. Rejecting that definition keeps declarative and Python-authored flows aligned before kickoff. --- lib/crewai/src/crewai/flow/flow_definition.py | 21 ++++++++ lib/crewai/src/crewai/flow/skill.py | 2 +- .../templates/flow_definition_skill.md.j2 | 3 ++ lib/crewai/tests/test_flow.py | 42 ++++++++------- lib/crewai/tests/test_flow_definition.py | 54 +++++++++++++------ 5 files changed, 86 insertions(+), 36 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index c3f6517eb..6e41cbdd7 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -772,6 +772,15 @@ class FlowDefinition(BaseModel): _validate_step_name(method_name, field="Flow method names") return self + @model_validator(mode="after") + def _validate_trigger_namespace(self) -> FlowDefinition: + for method_name, method in self.methods.items(): + if _condition_references(method.listen, method_name): + raise ValueError( + f"methods.{method_name}.listen must not reference itself" + ) + return self + @model_validator(mode="after") def _validate_cel_expressions(self) -> FlowDefinition: for method_name, method in self.methods.items(): @@ -869,6 +878,18 @@ def _validate_step_list(steps: list[FlowEachStepDefinition], *, field: str) -> N seen.add(name) +def _condition_references(condition: FlowDefinitionCondition | None, name: str) -> bool: + if condition is None: + return False + if isinstance(condition, str): + return condition == name + return any( + _condition_references(child, name) + for key in ("and", "or") + for child in condition.get(key, []) + ) + + def _validate_action_cel( action: FlowActionDefinition, *, diff --git a/lib/crewai/src/crewai/flow/skill.py b/lib/crewai/src/crewai/flow/skill.py index b7489a3d4..766c93e57 100644 --- a/lib/crewai/src/crewai/flow/skill.py +++ b/lib/crewai/src/crewai/flow/skill.py @@ -162,7 +162,7 @@ MODEL_SPECS: tuple[ModelSpec, ...] = ( descriptions={ "do": "Single action object executed when this method runs.", "start": "Marks a start method. Use `true` for the normal entrypoint. String or map conditions are advanced trigger conditions; use them only when the user asks for event/condition-based starts.", - "listen": 'Trigger condition that runs this method after upstream events. A string target can be a method name or a router-emitted event name, and both live in the same trigger namespace. Map conditions are for `and`/`or` trigger composition, for example `{"and": ["validated", "processed"]}`.', + "listen": 'Trigger condition that runs this method after upstream events. A string target can be a method name or a router-emitted event name, and both live in the same trigger namespace. Methods must not listen to their own method name. Map conditions are for `and`/`or` trigger composition, for example `{"and": ["validated", "processed"]}`.', "router": "Whether the method output should be treated as the next event name. Router actions must return one event name string, with no surrounding explanation.", "emit": "Declared router events this method may emit. Each emitted event name should be unique and should not collide with method names.", }, diff --git a/lib/crewai/src/crewai/flow/templates/flow_definition_skill.md.j2 b/lib/crewai/src/crewai/flow/templates/flow_definition_skill.md.j2 index dd0bc7638..bd049dd6a 100644 --- a/lib/crewai/src/crewai/flow/templates/flow_definition_skill.md.j2 +++ b/lib/crewai/src/crewai/flow/templates/flow_definition_skill.md.j2 @@ -60,6 +60,7 @@ Pick the simplest action that does the job. - `state` is the initial shared data shape. Action results do not automatically merge into `state`. - Read method results with `outputs.method_name` after that method can run. - `listen` targets a method name or a router-emitted event name. +- Methods must not listen to their own method name. - Method names and emitted event names share one namespace. Avoid reusing the same string for both unless the user explicitly wants that. - Use `router: true` plus `emit` when one method chooses between named branches. - A router action must return exactly one emitted event string. It must not return JSON, a list, or an explanation. @@ -127,6 +128,7 @@ Dynamic value rules: - Do not put more than one action under a method's `do`. - Do not make `do` a list. - Do not reference `outputs.some_method` before `some_method` can run. +- Do not set a method's `listen` to its own method name. - Do not use the same string for an emitted event and a method name unless the user asks for it. - Do not use `emit` without `router: true`{% if include_hitl %} or `human_feedback.emit`{% endif %}. - Do not rely on crew action-level `inputs` alone to ground agent behavior. Inputs that do not match placeholders are effectively unused by the prompt. @@ -195,6 +197,7 @@ Allowed shapes: - A method has exactly one `do` action object with one `call` discriminator. - `listen` targets method names and router-emitted event names in one shared namespace. +- Methods cannot listen to their own method name. - A router method result must match one declared `emit` value. - Crew action-level `inputs` are the Crew kickoff inputs; use CEL-wrapped strings there for runtime values. - Crew agent/task interpolation uses `{name}` placeholders from evaluated crew inputs. diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index 4b8a66671..06a420c4d 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -2144,14 +2144,7 @@ def test_cyclic_flow_works_with_persist_and_id_input(): @pytest.mark.timeout(5) -def test_self_listening_method_does_not_loop(): - """A method whose @listen label matches its own name must not loop forever. - - Without the guard, 'process' re-triggers itself on every completion, - running indefinitely (timeout → FAIL). The fix caps method calls - and raises RecursionError (PASS). - """ - +def test_self_listening_method_is_rejected(): class SelfListenFlow(Flow): @start() def begin(self): @@ -2165,15 +2158,11 @@ def test_self_listening_method_does_not_loop(): def process(self): pass - flow = SelfListenFlow() - with pytest.raises(RecursionError, match="infinite loop"): - flow.kickoff() + with pytest.raises(ValueError, match="methods.process.listen"): + SelfListenFlow.flow_definition() -def test_or_condition_self_listen_fires_once(): - """or_() with a self-referencing label only fires once due to or_() guard.""" - call_count = 0 - +def test_or_condition_self_listen_is_rejected(): class OrSelfListenFlow(Flow): @start() def begin(self): @@ -2185,12 +2174,25 @@ def test_or_condition_self_listen_fires_once(): @listen(or_("other_trigger", "process")) def process(self): - nonlocal call_count - call_count += 1 + pass + + with pytest.raises(ValueError, match="methods.process.listen"): + OrSelfListenFlow.flow_definition() + + +def test_router_self_listening_method_is_rejected(): + class RouterSelfListenFlow(Flow): + @start() + def begin(self): + return "route" + + @router("route") + def route(self): + return "done" + + with pytest.raises(ValueError, match="methods.route.listen"): + RouterSelfListenFlow.flow_definition() - flow = OrSelfListenFlow() - flow.kickoff() - assert call_count == 1 class ListState(BaseModel): items: list = [] diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index 0853025bd..a6c040afa 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -625,7 +625,7 @@ def test_flow_definition_from_declaration_accepts_json_and_yaml_strings(): return "left" @listen("left") - def left(self): + def handle_left(self): return "left" expected = RoundTripFlow.flow_definition() @@ -650,11 +650,11 @@ def test_flow_definition_from_declaration_accepts_json_and_yaml_strings(): "ref": "test_flow_definition:RoundTripFlow.decide" } }, - "left": { + "handle_left": { "listen": "left", "do": { "call": "code", - "ref": "test_flow_definition:RoundTripFlow.left" + "ref": "test_flow_definition:RoundTripFlow.handle_left" } } } @@ -675,11 +675,11 @@ def test_flow_definition_from_declaration_accepts_json_and_yaml_strings(): do: call: code ref: test_flow_definition:RoundTripFlow.decide - left: + handle_left: listen: left do: call: code - ref: test_flow_definition:RoundTripFlow.left + ref: test_flow_definition:RoundTripFlow.handle_left """, ] @@ -946,11 +946,11 @@ def test_flow_definition_infers_literal_router_emit(): return "left" @listen("left") - def left(self): + def handle_left(self): return "left" @listen("right") - def right(self): + def handle_right(self): return "right" definition = LiteralRouterFlow.flow_definition() @@ -973,11 +973,11 @@ def test_flow_definition_infers_enum_router_emit(): return Decision.APPROVE @listen("approve") - def approve(self): + def handle_approve(self): return "approve" @listen("reject") - def reject(self): + def handle_reject(self): return "reject" definition = EnumRouterFlow.flow_definition() @@ -996,11 +996,11 @@ def test_flow_definition_infers_literal_union_router_emit(): return "left" @listen("left") - def left(self): + def handle_left(self): return "left" @listen("right") - def right(self): + def handle_right(self): return "right" definition = LiteralUnionRouterFlow.flow_definition() @@ -1054,7 +1054,7 @@ def test_flow_definition_does_not_infer_unannotated_router_body_emit(): return "left" @listen("left") - def left(self): + def handle_left(self): return "left" definition = UnannotatedRouterFlow.flow_definition() @@ -1073,11 +1073,11 @@ def test_flow_definition_accepts_explicit_router_events(): return self.state["dynamic_event"] @listen("left") - def left(self): + def handle_left(self): return "left" @listen("right") - def right(self): + def handle_right(self): return "right" definition = ExplicitRouterFlow.flow_definition() @@ -1149,7 +1149,7 @@ def test_router_human_feedback_preserves_existing_router_metadata(): return "approved" @listen("approved") - def approved(self): + def handle_approved(self): return "approved" definition = RouterHumanFeedbackFlow.flow_definition() @@ -1214,6 +1214,30 @@ def test_static_string_listener_is_allowed_by_contract(): assert definition.methods["handle"].listen == "begni" +@pytest.mark.parametrize("listen", ["publish", {"or": ["publish", "revise"]}]) +@pytest.mark.parametrize("router_enabled", [False, True]) +def test_flow_definition_rejects_method_self_listen(listen, router_enabled): + with pytest.raises(ValueError, match="methods.publish.listen"): + flow_definition.FlowDefinition.from_declaration(contents= + { + "schema": "crewai.flow/v1", + "name": "SelfListenFlow", + "methods": { + "begin": { + "do": {"ref": "loaded_flows:SelfListenFlow.begin"}, + "start": True, + }, + "publish": { + "do": {"ref": "loaded_flows:SelfListenFlow.publish"}, + "listen": listen, + "router": router_enabled, + "emit": ["done"] if router_enabled else None, + }, + }, + } + ) + + def test_start_false_not_classified_as_start_method(): definition = flow_definition.FlowDefinition.from_declaration(contents= {