From 52d36ccaf96340e83ce26dd197ae687f08a578a2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 3 Mar 2026 03:33:33 +0000 Subject: [PATCH] feat: add agent loop detection middleware (#4682) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add LoopDetector class with sliding window tracking, configurable repetition threshold, and intervention strategies (inject_reflection, stop, or custom callback) - Add LoopDetectedEvent for observability via the event bus - Integrate loop detection into all 4 executor loop methods (sync/async ReAct and native tools) - Add loop_detector field to BaseAgent - Export LoopDetector from crewai public API - Add i18n support for loop detection warning message - Add 50 comprehensive unit tests Co-Authored-By: João --- lib/crewai/src/crewai/__init__.py | 2 + .../crewai/agents/agent_builder/base_agent.py | 10 + .../src/crewai/agents/crew_agent_executor.py | 124 +++++ lib/crewai/src/crewai/agents/loop_detector.py | 210 +++++++ .../src/crewai/events/types/loop_events.py | 48 ++ lib/crewai/src/crewai/translations/en.json | 1 + lib/crewai/tests/agents/test_loop_detector.py | 523 ++++++++++++++++++ 7 files changed, 918 insertions(+) create mode 100644 lib/crewai/src/crewai/agents/loop_detector.py create mode 100644 lib/crewai/src/crewai/events/types/loop_events.py create mode 100644 lib/crewai/tests/agents/test_loop_detector.py diff --git a/lib/crewai/src/crewai/__init__.py b/lib/crewai/src/crewai/__init__.py index 43bfc8de2..5f2e9dae3 100644 --- a/lib/crewai/src/crewai/__init__.py +++ b/lib/crewai/src/crewai/__init__.py @@ -4,6 +4,7 @@ import urllib.request import warnings from crewai.agent.core import Agent +from crewai.agents.loop_detector import LoopDetector from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput from crewai.flow.flow import Flow @@ -99,6 +100,7 @@ __all__ = [ "Flow", "Knowledge", "LLMGuardrail", + "LoopDetector", "Memory", "Process", "Task", diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py index 8b2b9737c..c0ba51538 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py @@ -22,6 +22,7 @@ from typing_extensions import Self from crewai.agent.internal.meta import AgentMeta from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess from crewai.agents.cache.cache_handler import CacheHandler +from crewai.agents.loop_detector import LoopDetector from crewai.agents.tools_handler import ToolsHandler from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.knowledge_config import KnowledgeConfig @@ -213,6 +214,15 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): "If not set, falls back to crew memory." ), ) + loop_detector: LoopDetector | None = Field( + default=None, + description=( + "Optional loop detection middleware. When set, monitors agent tool calls " + "for repetitive patterns and intervenes to break loops. " + "Pass a LoopDetector instance to configure window_size, " + "repetition_threshold, and on_loop behavior." + ), + ) @model_validator(mode="before") @classmethod diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index ff40489d9..1a24b80ff 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -17,6 +17,7 @@ from pydantic import BaseModel, GetCoreSchemaHandler, ValidationError from pydantic_core import CoreSchema, core_schema from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin +from crewai.agents.loop_detector import LoopDetector from crewai.agents.parser import ( AgentAction, AgentFinish, @@ -156,6 +157,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.messages: list[LLMMessage] = [] self.iterations = 0 self.log_error_after = 3 + self.loop_detector: LoopDetector | None = ( + agent.loop_detector if agent and hasattr(agent, "loop_detector") else None + ) + if self.loop_detector: + self.loop_detector.reset() self.before_llm_call_hooks: list[Callable[..., Any]] = [] self.after_llm_call_hooks: list[Callable[..., Any]] = [] self.before_llm_call_hooks.extend(get_before_llm_call_hooks()) @@ -427,6 +433,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): formatted_answer, tool_result ) + # Check for repetitive loop patterns + loop_result = self._record_and_check_loop( + formatted_answer.tool, formatted_answer.tool_input + ) + if loop_result is not None: + formatted_answer = loop_result + break + self._invoke_step_callback(formatted_answer) # type: ignore[arg-type] self._append_message(formatted_answer.text) # type: ignore[union-attr] @@ -783,6 +797,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if tool_finish: return tool_finish + # Check for loop after each parallel tool result + loop_result = self._record_and_check_loop( + execution_result["func_name"], + execution_result.get("tool_args", ""), + ) + if loop_result is not None: + return loop_result + reasoning_prompt = self._i18n.slice("post_tool_reasoning") reasoning_message: LLMMessage = { "role": "user", @@ -807,6 +829,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if tool_finish: return tool_finish + # Check for loop after sequential tool execution + loop_result = self._record_and_check_loop(func_name, func_args) + if loop_result is not None: + return loop_result + reasoning_prompt = self._i18n.slice("post_tool_reasoning") reasoning_message = { "role": "user", @@ -1068,6 +1095,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): "result": result, "from_cache": from_cache, "original_tool": original_tool, + "tool_args": func_args, } def _append_tool_result_and_check_finality( @@ -1263,6 +1291,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): formatted_answer, tool_result ) + # Check for repetitive loop patterns + loop_result = self._record_and_check_loop( + formatted_answer.tool, formatted_answer.tool_input + ) + if loop_result is not None: + formatted_answer = loop_result + break + await self._ainvoke_step_callback(formatted_answer) # type: ignore[arg-type] self._append_message(formatted_answer.text) # type: ignore[union-attr] @@ -1462,6 +1498,94 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self._show_logs(formatted_answer) return formatted_answer + def _record_and_check_loop( + self, tool_name: str, tool_args: str | dict[str, Any] + ) -> AgentFinish | None: + """Record a tool call and check for repetitive loop patterns. + + If a loop is detected, takes the configured action: + - ``inject_reflection``: injects a reflection prompt into messages. + - ``stop``: forces the agent to produce a final answer. + - callable: calls the user's callback and injects the returned message. + + Args: + tool_name: Name of the tool that was called. + tool_args: Arguments passed to the tool. + + Returns: + ``AgentFinish`` if the loop action is ``stop``, + ``None`` otherwise (including when reflection is injected). + """ + if not self.loop_detector: + return None + + self.loop_detector.record_tool_call(tool_name, tool_args) + + if not self.loop_detector.is_loop_detected(): + return None + + repeated_tool = self.loop_detector.get_repeated_tool_info() or tool_name + action_taken = ( + self.loop_detector.on_loop + if isinstance(self.loop_detector.on_loop, str) + else "callback" + ) + + # Emit loop detected event + from crewai.events.types.loop_events import LoopDetectedEvent + + crewai_event_bus.emit( + self.agent, + LoopDetectedEvent( + agent_role=self.agent.role if self.agent else "unknown", + agent_id=str(self.agent.id) if self.agent else None, + task_id=str(self.task.id) if self.task else None, + repeated_tool=repeated_tool, + action_taken=action_taken, + iteration=self.iterations, + agent=self.agent, + ), + ) + + if self.agent and self.agent.verbose: + self._printer.print( + content=f"Loop detected: {repeated_tool} called repeatedly. Action: {action_taken}", + color="bold_yellow", + ) + + if self.loop_detector.on_loop == "stop": + return handle_max_iterations_exceeded( + None, + printer=self._printer, + i18n=self._i18n, + messages=self.messages, + llm=self.llm, + callbacks=self.callbacks, + verbose=self.agent.verbose if self.agent else False, + ) + + # inject_reflection or callable + if callable(self.loop_detector.on_loop) and not isinstance( + self.loop_detector.on_loop, str + ): + message_text = self.loop_detector.get_loop_message() + else: + # Use i18n reflection prompt + count = self.loop_detector.repetition_threshold + message_text = self._i18n.slice("loop_detected").format( + repeated_tool=repeated_tool, count=count + ) + + loop_message: LLMMessage = { + "role": "user", + "content": message_text, + } + self.messages.append(loop_message) + + # Reset detector after intervention to give the agent a fresh window + self.loop_detector.reset() + return None + def _handle_agent_action( self, formatted_answer: AgentAction, tool_result: ToolResult ) -> AgentAction | AgentFinish: diff --git a/lib/crewai/src/crewai/agents/loop_detector.py b/lib/crewai/src/crewai/agents/loop_detector.py new file mode 100644 index 000000000..10f365e45 --- /dev/null +++ b/lib/crewai/src/crewai/agents/loop_detector.py @@ -0,0 +1,210 @@ +"""Loop detection for agent execution. + +Detects repetitive behavioral patterns in agent tool calls and provides +configurable intervention strategies to break out of loops. + +Example usage: + + from crewai import Agent + from crewai.agents.loop_detector import LoopDetector + + # Using default settings (window_size=5, repetition_threshold=3, inject_reflection) + agent = Agent( + role="Researcher", + goal="Find novel insights", + backstory="An experienced researcher", + loop_detector=LoopDetector(), + ) + + # Custom configuration + agent = Agent( + role="Researcher", + goal="Find novel insights", + backstory="An experienced researcher", + loop_detector=LoopDetector( + window_size=10, + repetition_threshold=4, + on_loop="stop", + ), + ) + + # With a custom callback + def my_callback(detector: LoopDetector) -> str: + return "You are repeating yourself. Try a completely different approach." + + agent = Agent( + role="Researcher", + goal="Find novel insights", + backstory="An experienced researcher", + loop_detector=LoopDetector( + on_loop=my_callback, + ), + ) +""" + +from __future__ import annotations + +from collections import deque +from collections.abc import Callable +import json +import logging +from typing import Any, Literal + +from pydantic import BaseModel, Field + + +logger = logging.getLogger(__name__) + + +class ToolCallRecord(BaseModel): + """Record of a single tool call for loop detection.""" + + tool_name: str + tool_args: str # Normalized JSON string of arguments + + def __eq__(self, other: object) -> bool: + if not isinstance(other, ToolCallRecord): + return NotImplemented + return self.tool_name == other.tool_name and self.tool_args == other.tool_args + + def __hash__(self) -> int: + return hash((self.tool_name, self.tool_args)) + + +class LoopDetector(BaseModel): + """Detects repetitive behavioral patterns in agent tool calls. + + Monitors a sliding window of recent tool calls and flags when + the same tool is called repeatedly with the same arguments, + indicating the agent is stuck in a loop. + + Args: + window_size: Number of recent tool calls to track in the + sliding window. Defaults to 5. + repetition_threshold: How many identical tool calls within + the window trigger loop detection. Defaults to 3. + on_loop: Action to take when a loop is detected. + - ``"inject_reflection"`` (default): Inject a meta-prompt + asking the agent to try a different approach. + - ``"stop"``: Force the agent to produce a final answer. + - A callable ``(LoopDetector) -> str``: Custom callback + that returns a message to inject into the conversation. + + Example:: + + from crewai import Agent + from crewai.agents.loop_detector import LoopDetector + + agent = Agent( + role="Researcher", + goal="Find novel insights about AI", + backstory="Senior researcher", + loop_detector=LoopDetector( + window_size=5, + repetition_threshold=3, + on_loop="inject_reflection", + ), + ) + """ + + window_size: int = Field( + default=5, + ge=2, + description="Number of recent tool calls to track in the sliding window.", + ) + repetition_threshold: int = Field( + default=3, + ge=2, + description="How many identical tool calls within the window trigger loop detection.", + ) + on_loop: Literal["inject_reflection", "stop"] | Callable[..., str] = Field( + default="inject_reflection", + description=( + "Action when loop is detected: 'inject_reflection' to inject a reflection prompt, " + "'stop' to force final answer, or a callable(LoopDetector) -> str for custom handling." + ), + ) + _history: deque[ToolCallRecord] = deque() + + def model_post_init(self, __context: Any) -> None: + """Initialize the internal deque with the configured window size.""" + object.__setattr__(self, "_history", deque(maxlen=self.window_size)) + + @staticmethod + def _normalize_args(args: str | dict[str, Any]) -> str: + """Normalize tool arguments to a canonical JSON string for comparison. + + Args: + args: Tool arguments as a string or dict. + + Returns: + A normalized JSON string with sorted keys. + """ + if isinstance(args, str): + try: + parsed = json.loads(args) + except (json.JSONDecodeError, TypeError): + return args.strip() + return json.dumps(parsed, sort_keys=True, default=str) + return json.dumps(args, sort_keys=True, default=str) + + def record_tool_call(self, tool_name: str, tool_args: str | dict[str, Any]) -> None: + """Record a tool call in the sliding window. + + Args: + tool_name: Name of the tool that was called. + tool_args: Arguments passed to the tool. + """ + record = ToolCallRecord( + tool_name=tool_name, + tool_args=self._normalize_args(tool_args), + ) + self._history.append(record) + + def is_loop_detected(self) -> bool: + """Check if a repetitive loop pattern is detected. + + Returns: + True if any single tool call (same name + same args) + appears at least ``repetition_threshold`` times within + the current window. + """ + if len(self._history) < self.repetition_threshold: + return False + + counts: dict[ToolCallRecord, int] = {} + for record in self._history: + counts[record] = counts.get(record, 0) + 1 + if counts[record] >= self.repetition_threshold: + return True + return False + + def get_loop_message(self) -> str: + """Get the intervention message based on the configured ``on_loop`` action. + + Returns: + A string message to inject into the agent conversation. + """ + if callable(self.on_loop) and not isinstance(self.on_loop, str): + return self.on_loop(self) + return "" + + def get_repeated_tool_info(self) -> str | None: + """Get information about the repeated tool call, if any. + + Returns: + A string describing the repeated tool and args, or None. + """ + if len(self._history) < self.repetition_threshold: + return None + + counts: dict[ToolCallRecord, int] = {} + for record in self._history: + counts[record] = counts.get(record, 0) + 1 + if counts[record] >= self.repetition_threshold: + return f"{record.tool_name}({record.tool_args})" + return None + + def reset(self) -> None: + """Clear the tool call history.""" + self._history.clear() diff --git a/lib/crewai/src/crewai/events/types/loop_events.py b/lib/crewai/src/crewai/events/types/loop_events.py new file mode 100644 index 000000000..a96c1723d --- /dev/null +++ b/lib/crewai/src/crewai/events/types/loop_events.py @@ -0,0 +1,48 @@ +"""Events related to agent loop detection.""" + +from __future__ import annotations + +from typing import Any + +from pydantic import ConfigDict, model_validator + +from crewai.events.base_events import BaseEvent + + +class LoopDetectedEvent(BaseEvent): + """Event emitted when a repetitive loop pattern is detected in agent behavior. + + Attributes: + agent_role: Role of the agent that is looping. + repeated_tool: Description of the repeated tool call (name + args). + action_taken: The action taken to break the loop. + iteration: Current iteration count when the loop was detected. + """ + + agent_role: str + agent_id: str | None = None + task_id: str | None = None + repeated_tool: str | None = None + action_taken: str + iteration: int + agent: Any | None = None + type: str = "loop_detected" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @model_validator(mode="after") + def set_fingerprint_data(self) -> LoopDetectedEvent: + """Set fingerprint data from the agent if available.""" + if ( + self.agent + and hasattr(self.agent, "fingerprint") + and self.agent.fingerprint + ): + self.source_fingerprint = self.agent.fingerprint.uuid_str + self.source_type = "agent" + if ( + hasattr(self.agent.fingerprint, "metadata") + and self.agent.fingerprint.metadata + ): + self.fingerprint_metadata = self.agent.fingerprint.metadata + return self diff --git a/lib/crewai/src/crewai/translations/en.json b/lib/crewai/src/crewai/translations/en.json index 1eb02c746..97b08661c 100644 --- a/lib/crewai/src/crewai/translations/en.json +++ b/lib/crewai/src/crewai/translations/en.json @@ -15,6 +15,7 @@ "native_tools": "", "native_task": "\nCurrent Task: {input}", "post_tool_reasoning": "Analyze the tool result. If requirements are met, provide the Final Answer. Otherwise, call the next tool. Deliver only the answer without meta-commentary.", + "loop_detected": "WARNING: You appear to be repeating the same action ({repeated_tool}). You have called it {count} times with the same arguments. This is not making progress. You MUST try a completely different approach — use a different tool, different arguments, or provide your Final Answer now.", "format": "Decide if you need a tool or can provide the final answer. Use one at a time.\nTo use a tool, use:\nThought: [reasoning]\nAction: [name from {tool_names}]\nAction Input: [JSON object]\n\nTo provide the final answer, use:\nThought: [reasoning]\nFinal Answer: [complete response]", "final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```", "format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```", diff --git a/lib/crewai/tests/agents/test_loop_detector.py b/lib/crewai/tests/agents/test_loop_detector.py new file mode 100644 index 000000000..17223983d --- /dev/null +++ b/lib/crewai/tests/agents/test_loop_detector.py @@ -0,0 +1,523 @@ +"""Unit tests for LoopDetector and loop detection integration. + +Tests cover: +- LoopDetector class: tracking, detection, message generation, configuration +- ToolCallRecord equality and hashing +- Edge cases: empty history, threshold boundaries, window overflow +- Integration with CrewAgentExecutor._record_and_check_loop +- LoopDetectedEvent creation +""" + +from __future__ import annotations + +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from crewai.agents.loop_detector import LoopDetector, ToolCallRecord +from crewai.events.types.loop_events import LoopDetectedEvent + + +# --------------------------------------------------------------------------- +# ToolCallRecord +# --------------------------------------------------------------------------- + +class TestToolCallRecord: + """Tests for ToolCallRecord model.""" + + def test_equality_same_name_and_args(self): + a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}') + b = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}') + assert a == b + + def test_inequality_different_name(self): + a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}') + b = ToolCallRecord(tool_name="fetch", tool_args='{"q": "hello"}') + assert a != b + + def test_inequality_different_args(self): + a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}') + b = ToolCallRecord(tool_name="search", tool_args='{"q": "world"}') + assert a != b + + def test_hash_same_records(self): + a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}') + b = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}') + assert hash(a) == hash(b) + + def test_hash_different_records(self): + a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}') + b = ToolCallRecord(tool_name="search", tool_args='{"q": "world"}') + # Different records should (almost certainly) have different hashes + assert hash(a) != hash(b) + + def test_equality_not_implemented_for_other_types(self): + a = ToolCallRecord(tool_name="search", tool_args="{}") + assert a != "not a record" + assert a.__eq__("not a record") is NotImplemented + + +# --------------------------------------------------------------------------- +# LoopDetector — Initialization +# --------------------------------------------------------------------------- + +class TestLoopDetectorInit: + """Tests for LoopDetector initialization and defaults.""" + + def test_default_values(self): + ld = LoopDetector() + assert ld.window_size == 5 + assert ld.repetition_threshold == 3 + assert ld.on_loop == "inject_reflection" + + def test_custom_values(self): + ld = LoopDetector(window_size=10, repetition_threshold=4, on_loop="stop") + assert ld.window_size == 10 + assert ld.repetition_threshold == 4 + assert ld.on_loop == "stop" + + def test_callable_on_loop(self): + def my_cb(detector: LoopDetector) -> str: + return "custom" + + ld = LoopDetector(on_loop=my_cb) + assert callable(ld.on_loop) + + def test_window_size_minimum(self): + with pytest.raises(Exception): + LoopDetector(window_size=1) + + def test_repetition_threshold_minimum(self): + with pytest.raises(Exception): + LoopDetector(repetition_threshold=1) + + +# --------------------------------------------------------------------------- +# LoopDetector — Argument Normalization +# --------------------------------------------------------------------------- + +class TestNormalizeArgs: + """Tests for LoopDetector._normalize_args static method.""" + + def test_dict_sorted_keys(self): + result = LoopDetector._normalize_args({"b": 2, "a": 1}) + assert result == '{"a": 1, "b": 2}' + + def test_json_string_normalized(self): + result = LoopDetector._normalize_args('{"b": 2, "a": 1}') + assert result == '{"a": 1, "b": 2}' + + def test_plain_string_stripped(self): + result = LoopDetector._normalize_args(" hello world ") + assert result == "hello world" + + def test_invalid_json_string(self): + result = LoopDetector._normalize_args("not json") + assert result == "not json" + + def test_empty_dict(self): + result = LoopDetector._normalize_args({}) + assert result == "{}" + + def test_empty_string(self): + result = LoopDetector._normalize_args("") + assert result == "" + + +# --------------------------------------------------------------------------- +# LoopDetector — Recording and Detection +# --------------------------------------------------------------------------- + +class TestLoopDetection: + """Tests for loop recording and detection logic.""" + + def test_no_loop_below_threshold(self): + ld = LoopDetector(repetition_threshold=3) + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + assert ld.is_loop_detected() is False + + def test_loop_at_threshold(self): + ld = LoopDetector(repetition_threshold=3) + for _ in range(3): + ld.record_tool_call("search", {"q": "test"}) + assert ld.is_loop_detected() is True + + def test_loop_above_threshold(self): + ld = LoopDetector(repetition_threshold=3) + for _ in range(5): + ld.record_tool_call("search", {"q": "test"}) + assert ld.is_loop_detected() is True + + def test_no_loop_with_different_tools(self): + ld = LoopDetector(repetition_threshold=3) + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("fetch", {"url": "http://example.com"}) + ld.record_tool_call("read", {"file": "data.txt"}) + assert ld.is_loop_detected() is False + + def test_no_loop_with_different_args(self): + ld = LoopDetector(repetition_threshold=3) + ld.record_tool_call("search", {"q": "test1"}) + ld.record_tool_call("search", {"q": "test2"}) + ld.record_tool_call("search", {"q": "test3"}) + assert ld.is_loop_detected() is False + + def test_loop_with_mixed_calls(self): + """Loop detected even if other calls are interspersed, as long as + threshold is met within the window.""" + ld = LoopDetector(window_size=5, repetition_threshold=3) + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("fetch", {"url": "http://example.com"}) + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + assert ld.is_loop_detected() is True + + def test_window_overflow_removes_old_calls(self): + """Old calls slide out of the window, potentially clearing the loop.""" + ld = LoopDetector(window_size=3, repetition_threshold=3) + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + # Now add a different call — this pushes first "search" out when we add + # one more different call + ld.record_tool_call("fetch", {"url": "http://example.com"}) + # Only 2 "search" in window at this point, not 3 + assert ld.is_loop_detected() is False + + def test_loop_with_string_args(self): + ld = LoopDetector(repetition_threshold=3) + for _ in range(3): + ld.record_tool_call("search", '{"q": "test"}') + assert ld.is_loop_detected() is True + + def test_loop_with_equivalent_json_args(self): + """Args with different key order should be normalized and treated as equal.""" + ld = LoopDetector(repetition_threshold=3) + ld.record_tool_call("search", '{"b": 2, "a": 1}') + ld.record_tool_call("search", '{"a": 1, "b": 2}') + ld.record_tool_call("search", {"a": 1, "b": 2}) + assert ld.is_loop_detected() is True + + def test_empty_history(self): + ld = LoopDetector() + assert ld.is_loop_detected() is False + + def test_single_call(self): + ld = LoopDetector() + ld.record_tool_call("search", {"q": "test"}) + assert ld.is_loop_detected() is False + + +# --------------------------------------------------------------------------- +# LoopDetector — Reset +# --------------------------------------------------------------------------- + +class TestLoopDetectorReset: + """Tests for LoopDetector.reset().""" + + def test_reset_clears_history(self): + ld = LoopDetector(repetition_threshold=3) + for _ in range(3): + ld.record_tool_call("search", {"q": "test"}) + assert ld.is_loop_detected() is True + ld.reset() + assert ld.is_loop_detected() is False + + def test_reset_allows_fresh_tracking(self): + ld = LoopDetector(repetition_threshold=3) + for _ in range(3): + ld.record_tool_call("search", {"q": "test"}) + ld.reset() + # Now record again — should not trigger until threshold + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + assert ld.is_loop_detected() is False + + +# --------------------------------------------------------------------------- +# LoopDetector — Messages and Tool Info +# --------------------------------------------------------------------------- + +class TestLoopDetectorMessages: + """Tests for get_loop_message and get_repeated_tool_info.""" + + def test_get_loop_message_with_callable(self): + def my_callback(detector: LoopDetector) -> str: + return "Custom loop message" + + ld = LoopDetector(on_loop=my_callback) + assert ld.get_loop_message() == "Custom loop message" + + def test_get_loop_message_inject_reflection_returns_empty(self): + ld = LoopDetector(on_loop="inject_reflection") + assert ld.get_loop_message() == "" + + def test_get_loop_message_stop_returns_empty(self): + ld = LoopDetector(on_loop="stop") + assert ld.get_loop_message() == "" + + def test_get_repeated_tool_info_with_loop(self): + ld = LoopDetector(repetition_threshold=3) + for _ in range(3): + ld.record_tool_call("search", {"q": "test"}) + info = ld.get_repeated_tool_info() + assert info is not None + assert "search" in info + + def test_get_repeated_tool_info_no_loop(self): + ld = LoopDetector(repetition_threshold=3) + ld.record_tool_call("search", {"q": "test"}) + assert ld.get_repeated_tool_info() is None + + def test_get_repeated_tool_info_empty_history(self): + ld = LoopDetector() + assert ld.get_repeated_tool_info() is None + + def test_callback_receives_detector_instance(self): + received = {} + + def my_callback(detector: LoopDetector) -> str: + received["detector"] = detector + return "msg" + + ld = LoopDetector(on_loop=my_callback) + ld.get_loop_message() + assert received["detector"] is ld + + +# --------------------------------------------------------------------------- +# LoopDetectedEvent +# --------------------------------------------------------------------------- + +class TestLoopDetectedEvent: + """Tests for the LoopDetectedEvent model.""" + + def test_event_creation(self): + event = LoopDetectedEvent( + agent_role="Researcher", + agent_id="agent-1", + task_id="task-1", + repeated_tool="search({\"q\": \"test\"})", + action_taken="inject_reflection", + iteration=5, + ) + assert event.agent_role == "Researcher" + assert event.repeated_tool == 'search({"q": "test"})' + assert event.action_taken == "inject_reflection" + assert event.iteration == 5 + assert event.type == "loop_detected" + + def test_event_with_agent_fingerprint(self): + mock_agent = MagicMock() + mock_agent.fingerprint.uuid_str = "fp-123" + mock_agent.fingerprint.metadata = {"key": "value"} + + event = LoopDetectedEvent( + agent_role="Researcher", + action_taken="stop", + iteration=3, + agent=mock_agent, + ) + assert event.source_fingerprint == "fp-123" + assert event.fingerprint_metadata == {"key": "value"} + + def test_event_without_agent(self): + event = LoopDetectedEvent( + agent_role="Researcher", + action_taken="inject_reflection", + iteration=1, + ) + assert event.agent is None + assert event.agent_id is None + + +# --------------------------------------------------------------------------- +# Integration: CrewAgentExecutor._record_and_check_loop +# --------------------------------------------------------------------------- + +class TestRecordAndCheckLoop: + """Tests for the _record_and_check_loop helper on CrewAgentExecutor.""" + + @pytest.fixture + def _make_executor(self): + """Factory that builds a minimal CrewAgentExecutor-like object. + + We import the real class and patch its __init__ so we can + test _record_and_check_loop without setting up the full executor. + """ + from crewai.agents.crew_agent_executor import CrewAgentExecutor + + def _factory( + loop_detector: LoopDetector | None = None, + verbose: bool = False, + ) -> CrewAgentExecutor: + executor = object.__new__(CrewAgentExecutor) + # Minimal attributes required by _record_and_check_loop + executor.loop_detector = loop_detector + + mock_agent = Mock() + mock_agent.role = "Test Agent" + mock_agent.id = "agent-1" + mock_agent.verbose = verbose + executor.agent = mock_agent + + mock_task = Mock() + mock_task.id = "task-1" + executor.task = mock_task + + executor.iterations = 0 + executor.messages = [] + + mock_printer = Mock() + executor._printer = mock_printer + + mock_i18n = Mock() + mock_i18n.slice.return_value = ( + "WARNING: You appear to be repeating the same action ({repeated_tool}). " + "You have called it {count} times." + ) + executor._i18n = mock_i18n + + mock_llm = Mock() + executor.llm = mock_llm + executor.callbacks = [] + + return executor + + return _factory + + def test_no_loop_detector_returns_none(self, _make_executor): + executor = _make_executor(loop_detector=None) + result = executor._record_and_check_loop("search", {"q": "test"}) + assert result is None + + def test_no_loop_detected_returns_none(self, _make_executor): + ld = LoopDetector(repetition_threshold=3) + executor = _make_executor(loop_detector=ld) + # Only 1 call — no loop + result = executor._record_and_check_loop("search", {"q": "test"}) + assert result is None + + @patch("crewai.agents.crew_agent_executor.crewai_event_bus") + def test_inject_reflection_appends_message(self, mock_bus, _make_executor): + ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection") + executor = _make_executor(loop_detector=ld) + + # Record 2 calls before the 3rd (which triggers detection) + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + + result = executor._record_and_check_loop("search", {"q": "test"}) + + assert result is None # inject_reflection does NOT stop + assert len(executor.messages) == 1 + assert "repeating" in executor.messages[0]["content"].lower() + # Event should have been emitted + mock_bus.emit.assert_called_once() + + @patch("crewai.agents.crew_agent_executor.crewai_event_bus") + @patch("crewai.agents.crew_agent_executor.handle_max_iterations_exceeded") + def test_stop_returns_agent_finish( + self, mock_handle_max, mock_bus, _make_executor + ): + from crewai.agents.parser import AgentFinish + + mock_handle_max.return_value = AgentFinish( + thought="forced", output="Stopped", text="Stopped" + ) + + ld = LoopDetector(repetition_threshold=3, on_loop="stop") + executor = _make_executor(loop_detector=ld) + + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + + result = executor._record_and_check_loop("search", {"q": "test"}) + + assert result is not None + assert isinstance(result, AgentFinish) + mock_handle_max.assert_called_once() + mock_bus.emit.assert_called_once() + + @patch("crewai.agents.crew_agent_executor.crewai_event_bus") + def test_callable_on_loop_injects_callback_message( + self, mock_bus, _make_executor + ): + def custom_callback(detector: LoopDetector) -> str: + return "CUSTOM: Stop repeating yourself!" + + ld = LoopDetector(repetition_threshold=3, on_loop=custom_callback) + executor = _make_executor(loop_detector=ld) + + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + + result = executor._record_and_check_loop("search", {"q": "test"}) + + assert result is None + assert len(executor.messages) == 1 + assert "CUSTOM: Stop repeating yourself!" in executor.messages[0]["content"] + + @patch("crewai.agents.crew_agent_executor.crewai_event_bus") + def test_detection_resets_after_intervention(self, mock_bus, _make_executor): + ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection") + executor = _make_executor(loop_detector=ld) + + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + + # 3rd call triggers loop + executor._record_and_check_loop("search", {"q": "test"}) + + # After intervention, detector should be reset + assert ld.is_loop_detected() is False + + @patch("crewai.agents.crew_agent_executor.crewai_event_bus") + def test_verbose_prints_message(self, mock_bus, _make_executor): + ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection") + executor = _make_executor(loop_detector=ld, verbose=True) + + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + + executor._record_and_check_loop("search", {"q": "test"}) + + executor._printer.print.assert_called_once() + call_kwargs = executor._printer.print.call_args + assert "Loop detected" in call_kwargs.kwargs.get( + "content", call_kwargs[1].get("content", "") + ) + + @patch("crewai.agents.crew_agent_executor.crewai_event_bus") + def test_event_contains_correct_data(self, mock_bus, _make_executor): + ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection") + executor = _make_executor(loop_detector=ld) + + ld.record_tool_call("search", {"q": "test"}) + ld.record_tool_call("search", {"q": "test"}) + + executor._record_and_check_loop("search", {"q": "test"}) + + event = mock_bus.emit.call_args[0][1] + assert isinstance(event, LoopDetectedEvent) + assert event.agent_role == "Test Agent" + assert event.action_taken == "inject_reflection" + assert event.iteration == 0 # executor.iterations was 0 + + +# --------------------------------------------------------------------------- +# Public API import +# --------------------------------------------------------------------------- + +class TestPublicImport: + """Verify LoopDetector is importable from the public crewai package.""" + + def test_import_from_crewai(self): + from crewai import LoopDetector as LD + + assert LD is LoopDetector + + def test_in_all(self): + import crewai + + assert "LoopDetector" in crewai.__all__