diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 5f6b6f50f..4f2a92681 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -302,6 +302,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): super().__init__( suppress_flow_events=True, tracing=current_tracing if current_tracing else None, + max_method_calls=self.max_iter * 10, ) self._flow_initialized = True @@ -403,7 +404,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._setup_native_tools() return "initialized" - @listen("force_final_answer") + @listen("max_iterations_exceeded") def force_final_answer(self) -> Literal["agent_finished"]: """Force agent to provide final answer when max iterations exceeded.""" formatted_answer = handle_max_iterations_exceeded( @@ -655,11 +656,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "tool_result_is_final" reasoning_prompt = self._i18n.slice("post_tool_reasoning") - reasoning_message: LLMMessage = { + reasoning_message_post: LLMMessage = { "role": "user", "content": reasoning_prompt, } - self.state.messages.append(reasoning_message) + self.state.messages.append(reasoning_message_post) return "tool_completed" @@ -886,9 +887,10 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): call_id, func_name, func_args = info # Parse arguments - args_dict, parse_error = parse_tool_call_args(func_args, func_name, call_id) + parsed_args, parse_error = parse_tool_call_args(func_args, func_name, call_id) if parse_error is not None: return parse_error + args_dict: dict[str, Any] = parsed_args or {} # Get agent_key for event tracking agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown" @@ -1107,11 +1109,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): def check_max_iterations( self, ) -> Literal[ - "force_final_answer", "continue_reasoning", "continue_reasoning_native" + "max_iterations_exceeded", "continue_reasoning", "continue_reasoning_native" ]: """Check if max iterations reached before proceeding with reasoning.""" if has_reached_max_iterations(self.state.iterations, self.max_iter): - return "force_final_answer" + return "max_iterations_exceeded" if self.state.use_native_tools: return "continue_reasoning_native" return "continue_reasoning" diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 868f52632..e8ddc4765 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -692,6 +692,7 @@ class FlowMeta(type): condition_type = getattr( attr_value, "__condition_type__", OR_CONDITION ) + if ( hasattr(attr_value, "__trigger_condition__") and attr_value.__trigger_condition__ is not None @@ -769,6 +770,7 @@ class Flow(Generic[T], metaclass=FlowMeta): persistence: FlowPersistence | None = None, tracing: bool | None = None, suppress_flow_events: bool = False, + max_method_calls: int = 100, **kwargs: Any, ) -> None: """Initialize a new Flow instance. @@ -777,6 +779,7 @@ class Flow(Generic[T], metaclass=FlowMeta): persistence: Optional persistence backend for storing flow states tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings suppress_flow_events: Whether to suppress flow event emissions (internal use) + max_method_calls: Maximum times a single method can be called per execution before raising RecursionError **kwargs: Additional state values to initialize or override """ # Initialize basic instance attributes @@ -792,6 +795,8 @@ class Flow(Generic[T], metaclass=FlowMeta): self._completed_methods: set[FlowMethodName] = ( set() ) # Track completed methods for reload + self._method_call_counts: dict[FlowMethodName, int] = {} + self._max_method_calls = max_method_calls self._persistence: FlowPersistence | None = persistence self._is_execution_resuming: bool = False self._event_futures: list[Future[None]] = [] @@ -1828,6 +1833,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self._method_outputs.clear() self._pending_and_listeners.clear() self._clear_or_listeners() + self._method_call_counts.clear() else: # Only enter resumption mode if there are completed methods to # replay. When _completed_methods is empty (e.g. a pure @@ -2569,6 +2575,16 @@ class Flow(Generic[T], metaclass=FlowMeta): - Skips execution if method was already completed (e.g., after reload) - Catches and logs any exceptions during execution, preventing individual listener failures from breaking the entire flow """ + count = self._method_call_counts.get(listener_name, 0) + 1 + if count > self._max_method_calls: + raise RecursionError( + f"Method '{listener_name}' has been called {self._max_method_calls} times in " + f"this flow execution, which indicates an infinite loop. " + f"This commonly happens when a @listen label matches the " + f"method's own name." + ) + self._method_call_counts[listener_name] = count + if listener_name in self._completed_methods: if self._is_execution_resuming: # During resumption, skip execution but continue listeners diff --git a/lib/crewai/tests/agents/test_agent_executor.py b/lib/crewai/tests/agents/test_agent_executor.py index ca731ab37..ab886ff38 100644 --- a/lib/crewai/tests/agents/test_agent_executor.py +++ b/lib/crewai/tests/agents/test_agent_executor.py @@ -123,7 +123,7 @@ class TestAgentExecutor: executor.state.iterations = 10 result = executor.check_max_iterations() - assert result == "force_final_answer" + assert result == "max_iterations_exceeded" def test_route_by_answer_type_action(self, mock_dependencies): """Test routing for AgentAction.""" diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index 585b6881e..ccb08cb0a 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -1843,3 +1843,53 @@ def test_cyclic_flow_works_with_persist_and_id_input(): f"'{method}' should fire 3 times, " f"got {len(events)}: {execution_order}" ) + + +@pytest.mark.timeout(5) +def test_self_listening_method_does_not_loop(): + """A method whose @listen label matches its own name must not loop forever. + + Without the guard, 'process' re-triggers itself on every completion, + running indefinitely (timeout → FAIL). The fix caps method calls + and raises RecursionError (PASS). + """ + + class SelfListenFlow(Flow): + @start() + def begin(self): + return "process" + + @router(begin) + def route(self): + return "process" + + @listen("process") + def process(self): + pass + + flow = SelfListenFlow() + with pytest.raises(RecursionError, match="infinite loop"): + flow.kickoff() + + +def test_or_condition_self_listen_fires_once(): + """or_() with a self-referencing label only fires once due to or_() guard.""" + call_count = 0 + + class OrSelfListenFlow(Flow): + @start() + def begin(self): + return "process" + + @router(begin) + def route(self): + return "process" + + @listen(or_("other_trigger", "process")) + def process(self): + nonlocal call_count + call_count += 1 + + flow = OrSelfListenFlow() + flow.kickoff() + assert call_count == 1