fix: adjust executor listener value to avoid recursion (#4705)

* fix: adjust executor listener value to avoid recursion

* fix: clear call count to ensure zero state

* feat: expose max method call kwarg
This commit is contained in:
Greyson LaLonde
2026-03-04 13:47:22 -05:00
committed by GitHub
parent 3cc6516ae5
commit ea70976a5d
4 changed files with 75 additions and 7 deletions

View File

@@ -302,6 +302,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
super().__init__(
suppress_flow_events=True,
tracing=current_tracing if current_tracing else None,
max_method_calls=self.max_iter * 10,
)
self._flow_initialized = True
@@ -403,7 +404,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._setup_native_tools()
return "initialized"
@listen("force_final_answer")
@listen("max_iterations_exceeded")
def force_final_answer(self) -> Literal["agent_finished"]:
"""Force agent to provide final answer when max iterations exceeded."""
formatted_answer = handle_max_iterations_exceeded(
@@ -655,11 +656,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "tool_result_is_final"
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
reasoning_message_post: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
self.state.messages.append(reasoning_message_post)
return "tool_completed"
@@ -886,9 +887,10 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
call_id, func_name, func_args = info
# Parse arguments
args_dict, parse_error = parse_tool_call_args(func_args, func_name, call_id)
parsed_args, parse_error = parse_tool_call_args(func_args, func_name, call_id)
if parse_error is not None:
return parse_error
args_dict: dict[str, Any] = parsed_args or {}
# Get agent_key for event tracking
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
@@ -1107,11 +1109,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
def check_max_iterations(
self,
) -> Literal[
"force_final_answer", "continue_reasoning", "continue_reasoning_native"
"max_iterations_exceeded", "continue_reasoning", "continue_reasoning_native"
]:
"""Check if max iterations reached before proceeding with reasoning."""
if has_reached_max_iterations(self.state.iterations, self.max_iter):
return "force_final_answer"
return "max_iterations_exceeded"
if self.state.use_native_tools:
return "continue_reasoning_native"
return "continue_reasoning"

View File

@@ -692,6 +692,7 @@ class FlowMeta(type):
condition_type = getattr(
attr_value, "__condition_type__", OR_CONDITION
)
if (
hasattr(attr_value, "__trigger_condition__")
and attr_value.__trigger_condition__ is not None
@@ -769,6 +770,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
persistence: FlowPersistence | None = None,
tracing: bool | None = None,
suppress_flow_events: bool = False,
max_method_calls: int = 100,
**kwargs: Any,
) -> None:
"""Initialize a new Flow instance.
@@ -777,6 +779,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
persistence: Optional persistence backend for storing flow states
tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings
suppress_flow_events: Whether to suppress flow event emissions (internal use)
max_method_calls: Maximum times a single method can be called per execution before raising RecursionError
**kwargs: Additional state values to initialize or override
"""
# Initialize basic instance attributes
@@ -792,6 +795,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods: set[FlowMethodName] = (
set()
) # Track completed methods for reload
self._method_call_counts: dict[FlowMethodName, int] = {}
self._max_method_calls = max_method_calls
self._persistence: FlowPersistence | None = persistence
self._is_execution_resuming: bool = False
self._event_futures: list[Future[None]] = []
@@ -1828,6 +1833,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._clear_or_listeners()
self._method_call_counts.clear()
else:
# Only enter resumption mode if there are completed methods to
# replay. When _completed_methods is empty (e.g. a pure
@@ -2569,6 +2575,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
- Skips execution if method was already completed (e.g., after reload)
- Catches and logs any exceptions during execution, preventing individual listener failures from breaking the entire flow
"""
count = self._method_call_counts.get(listener_name, 0) + 1
if count > self._max_method_calls:
raise RecursionError(
f"Method '{listener_name}' has been called {self._max_method_calls} times in "
f"this flow execution, which indicates an infinite loop. "
f"This commonly happens when a @listen label matches the "
f"method's own name."
)
self._method_call_counts[listener_name] = count
if listener_name in self._completed_methods:
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners

View File

@@ -123,7 +123,7 @@ class TestAgentExecutor:
executor.state.iterations = 10
result = executor.check_max_iterations()
assert result == "force_final_answer"
assert result == "max_iterations_exceeded"
def test_route_by_answer_type_action(self, mock_dependencies):
"""Test routing for AgentAction."""

View File

@@ -1843,3 +1843,53 @@ def test_cyclic_flow_works_with_persist_and_id_input():
f"'{method}' should fire 3 times, "
f"got {len(events)}: {execution_order}"
)
@pytest.mark.timeout(5)
def test_self_listening_method_does_not_loop():
"""A method whose @listen label matches its own name must not loop forever.
Without the guard, 'process' re-triggers itself on every completion,
running indefinitely (timeout → FAIL). The fix caps method calls
and raises RecursionError (PASS).
"""
class SelfListenFlow(Flow):
@start()
def begin(self):
return "process"
@router(begin)
def route(self):
return "process"
@listen("process")
def process(self):
pass
flow = SelfListenFlow()
with pytest.raises(RecursionError, match="infinite loop"):
flow.kickoff()
def test_or_condition_self_listen_fires_once():
"""or_() with a self-referencing label only fires once due to or_() guard."""
call_count = 0
class OrSelfListenFlow(Flow):
@start()
def begin(self):
return "process"
@router(begin)
def route(self):
return "process"
@listen(or_("other_trigger", "process"))
def process(self):
nonlocal call_count
call_count += 1
flow = OrSelfListenFlow()
flow.kickoff()
assert call_count == 1