Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
420ed828b7 fix: capture tool name/input before _handle_agent_action in ReAct loops
_handle_agent_action can convert AgentAction to AgentFinish (when
result_as_answer=True). Accessing .tool and .tool_input on AgentFinish
would raise AttributeError. Now we capture the values beforehand.

Co-Authored-By: João <joao@crewai.com>
2026-03-03 03:36:08 +00:00
Devin AI
52d36ccaf9 feat: add agent loop detection middleware (#4682)
- Add LoopDetector class with sliding window tracking, configurable
  repetition threshold, and intervention strategies (inject_reflection,
  stop, or custom callback)
- Add LoopDetectedEvent for observability via the event bus
- Integrate loop detection into all 4 executor loop methods (sync/async
  ReAct and native tools)
- Add loop_detector field to BaseAgent
- Export LoopDetector from crewai public API
- Add i18n support for loop detection warning message
- Add 50 comprehensive unit tests

Co-Authored-By: João <joao@crewai.com>
2026-03-03 03:33:33 +00:00
7 changed files with 928 additions and 0 deletions

View File

@@ -4,6 +4,7 @@ import urllib.request
import warnings
from crewai.agent.core import Agent
from crewai.agents.loop_detector import LoopDetector
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.flow.flow import Flow
@@ -99,6 +100,7 @@ __all__ = [
"Flow",
"Knowledge",
"LLMGuardrail",
"LoopDetector",
"Memory",
"Process",
"Task",

View File

@@ -22,6 +22,7 @@ from typing_extensions import Self
from crewai.agent.internal.meta import AgentMeta
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.loop_detector import LoopDetector
from crewai.agents.tools_handler import ToolsHandler
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.knowledge_config import KnowledgeConfig
@@ -213,6 +214,15 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
"If not set, falls back to crew memory."
),
)
loop_detector: LoopDetector | None = Field(
default=None,
description=(
"Optional loop detection middleware. When set, monitors agent tool calls "
"for repetitive patterns and intervenes to break loops. "
"Pass a LoopDetector instance to configure window_size, "
"repetition_threshold, and on_loop behavior."
),
)
@model_validator(mode="before")
@classmethod

View File

@@ -17,6 +17,7 @@ from pydantic import BaseModel, GetCoreSchemaHandler, ValidationError
from pydantic_core import CoreSchema, core_schema
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.loop_detector import LoopDetector
from crewai.agents.parser import (
AgentAction,
AgentFinish,
@@ -156,6 +157,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.messages: list[LLMMessage] = []
self.iterations = 0
self.log_error_after = 3
self.loop_detector: LoopDetector | None = (
agent.loop_detector if agent and hasattr(agent, "loop_detector") else None
)
if self.loop_detector:
self.loop_detector.reset()
self.before_llm_call_hooks: list[Callable[..., Any]] = []
self.after_llm_call_hooks: list[Callable[..., Any]] = []
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
@@ -410,6 +416,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
}
# Capture tool info before _handle_agent_action may
# convert the answer to AgentFinish (result_as_answer).
_tool_name = formatted_answer.tool
_tool_input = formatted_answer.tool_input
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
fingerprint_context=fingerprint_context,
@@ -427,6 +438,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer, tool_result
)
# Check for repetitive loop patterns
loop_result = self._record_and_check_loop(
_tool_name, _tool_input
)
if loop_result is not None:
formatted_answer = loop_result
break
self._invoke_step_callback(formatted_answer) # type: ignore[arg-type]
self._append_message(formatted_answer.text) # type: ignore[union-attr]
@@ -783,6 +802,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if tool_finish:
return tool_finish
# Check for loop after each parallel tool result
loop_result = self._record_and_check_loop(
execution_result["func_name"],
execution_result.get("tool_args", ""),
)
if loop_result is not None:
return loop_result
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
@@ -807,6 +834,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if tool_finish:
return tool_finish
# Check for loop after sequential tool execution
loop_result = self._record_and_check_loop(func_name, func_args)
if loop_result is not None:
return loop_result
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message = {
"role": "user",
@@ -1068,6 +1100,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"result": result,
"from_cache": from_cache,
"original_tool": original_tool,
"tool_args": func_args,
}
def _append_tool_result_and_check_finality(
@@ -1246,6 +1279,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
}
# Capture tool info before _handle_agent_action may
# convert the answer to AgentFinish (result_as_answer).
_tool_name = formatted_answer.tool
_tool_input = formatted_answer.tool_input
tool_result = await aexecute_tool_and_check_finality(
agent_action=formatted_answer,
fingerprint_context=fingerprint_context,
@@ -1263,6 +1301,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer, tool_result
)
# Check for repetitive loop patterns
loop_result = self._record_and_check_loop(
_tool_name, _tool_input
)
if loop_result is not None:
formatted_answer = loop_result
break
await self._ainvoke_step_callback(formatted_answer) # type: ignore[arg-type]
self._append_message(formatted_answer.text) # type: ignore[union-attr]
@@ -1462,6 +1508,94 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _record_and_check_loop(
self, tool_name: str, tool_args: str | dict[str, Any]
) -> AgentFinish | None:
"""Record a tool call and check for repetitive loop patterns.
If a loop is detected, takes the configured action:
- ``inject_reflection``: injects a reflection prompt into messages.
- ``stop``: forces the agent to produce a final answer.
- callable: calls the user's callback and injects the returned message.
Args:
tool_name: Name of the tool that was called.
tool_args: Arguments passed to the tool.
Returns:
``AgentFinish`` if the loop action is ``stop``,
``None`` otherwise (including when reflection is injected).
"""
if not self.loop_detector:
return None
self.loop_detector.record_tool_call(tool_name, tool_args)
if not self.loop_detector.is_loop_detected():
return None
repeated_tool = self.loop_detector.get_repeated_tool_info() or tool_name
action_taken = (
self.loop_detector.on_loop
if isinstance(self.loop_detector.on_loop, str)
else "callback"
)
# Emit loop detected event
from crewai.events.types.loop_events import LoopDetectedEvent
crewai_event_bus.emit(
self.agent,
LoopDetectedEvent(
agent_role=self.agent.role if self.agent else "unknown",
agent_id=str(self.agent.id) if self.agent else None,
task_id=str(self.task.id) if self.task else None,
repeated_tool=repeated_tool,
action_taken=action_taken,
iteration=self.iterations,
agent=self.agent,
),
)
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Loop detected: {repeated_tool} called repeatedly. Action: {action_taken}",
color="bold_yellow",
)
if self.loop_detector.on_loop == "stop":
return handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
verbose=self.agent.verbose if self.agent else False,
)
# inject_reflection or callable
if callable(self.loop_detector.on_loop) and not isinstance(
self.loop_detector.on_loop, str
):
message_text = self.loop_detector.get_loop_message()
else:
# Use i18n reflection prompt
count = self.loop_detector.repetition_threshold
message_text = self._i18n.slice("loop_detected").format(
repeated_tool=repeated_tool, count=count
)
loop_message: LLMMessage = {
"role": "user",
"content": message_text,
}
self.messages.append(loop_message)
# Reset detector after intervention to give the agent a fresh window
self.loop_detector.reset()
return None
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish:

View File

@@ -0,0 +1,210 @@
"""Loop detection for agent execution.
Detects repetitive behavioral patterns in agent tool calls and provides
configurable intervention strategies to break out of loops.
Example usage:
from crewai import Agent
from crewai.agents.loop_detector import LoopDetector
# Using default settings (window_size=5, repetition_threshold=3, inject_reflection)
agent = Agent(
role="Researcher",
goal="Find novel insights",
backstory="An experienced researcher",
loop_detector=LoopDetector(),
)
# Custom configuration
agent = Agent(
role="Researcher",
goal="Find novel insights",
backstory="An experienced researcher",
loop_detector=LoopDetector(
window_size=10,
repetition_threshold=4,
on_loop="stop",
),
)
# With a custom callback
def my_callback(detector: LoopDetector) -> str:
return "You are repeating yourself. Try a completely different approach."
agent = Agent(
role="Researcher",
goal="Find novel insights",
backstory="An experienced researcher",
loop_detector=LoopDetector(
on_loop=my_callback,
),
)
"""
from __future__ import annotations
from collections import deque
from collections.abc import Callable
import json
import logging
from typing import Any, Literal
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class ToolCallRecord(BaseModel):
"""Record of a single tool call for loop detection."""
tool_name: str
tool_args: str # Normalized JSON string of arguments
def __eq__(self, other: object) -> bool:
if not isinstance(other, ToolCallRecord):
return NotImplemented
return self.tool_name == other.tool_name and self.tool_args == other.tool_args
def __hash__(self) -> int:
return hash((self.tool_name, self.tool_args))
class LoopDetector(BaseModel):
"""Detects repetitive behavioral patterns in agent tool calls.
Monitors a sliding window of recent tool calls and flags when
the same tool is called repeatedly with the same arguments,
indicating the agent is stuck in a loop.
Args:
window_size: Number of recent tool calls to track in the
sliding window. Defaults to 5.
repetition_threshold: How many identical tool calls within
the window trigger loop detection. Defaults to 3.
on_loop: Action to take when a loop is detected.
- ``"inject_reflection"`` (default): Inject a meta-prompt
asking the agent to try a different approach.
- ``"stop"``: Force the agent to produce a final answer.
- A callable ``(LoopDetector) -> str``: Custom callback
that returns a message to inject into the conversation.
Example::
from crewai import Agent
from crewai.agents.loop_detector import LoopDetector
agent = Agent(
role="Researcher",
goal="Find novel insights about AI",
backstory="Senior researcher",
loop_detector=LoopDetector(
window_size=5,
repetition_threshold=3,
on_loop="inject_reflection",
),
)
"""
window_size: int = Field(
default=5,
ge=2,
description="Number of recent tool calls to track in the sliding window.",
)
repetition_threshold: int = Field(
default=3,
ge=2,
description="How many identical tool calls within the window trigger loop detection.",
)
on_loop: Literal["inject_reflection", "stop"] | Callable[..., str] = Field(
default="inject_reflection",
description=(
"Action when loop is detected: 'inject_reflection' to inject a reflection prompt, "
"'stop' to force final answer, or a callable(LoopDetector) -> str for custom handling."
),
)
_history: deque[ToolCallRecord] = deque()
def model_post_init(self, __context: Any) -> None:
"""Initialize the internal deque with the configured window size."""
object.__setattr__(self, "_history", deque(maxlen=self.window_size))
@staticmethod
def _normalize_args(args: str | dict[str, Any]) -> str:
"""Normalize tool arguments to a canonical JSON string for comparison.
Args:
args: Tool arguments as a string or dict.
Returns:
A normalized JSON string with sorted keys.
"""
if isinstance(args, str):
try:
parsed = json.loads(args)
except (json.JSONDecodeError, TypeError):
return args.strip()
return json.dumps(parsed, sort_keys=True, default=str)
return json.dumps(args, sort_keys=True, default=str)
def record_tool_call(self, tool_name: str, tool_args: str | dict[str, Any]) -> None:
"""Record a tool call in the sliding window.
Args:
tool_name: Name of the tool that was called.
tool_args: Arguments passed to the tool.
"""
record = ToolCallRecord(
tool_name=tool_name,
tool_args=self._normalize_args(tool_args),
)
self._history.append(record)
def is_loop_detected(self) -> bool:
"""Check if a repetitive loop pattern is detected.
Returns:
True if any single tool call (same name + same args)
appears at least ``repetition_threshold`` times within
the current window.
"""
if len(self._history) < self.repetition_threshold:
return False
counts: dict[ToolCallRecord, int] = {}
for record in self._history:
counts[record] = counts.get(record, 0) + 1
if counts[record] >= self.repetition_threshold:
return True
return False
def get_loop_message(self) -> str:
"""Get the intervention message based on the configured ``on_loop`` action.
Returns:
A string message to inject into the agent conversation.
"""
if callable(self.on_loop) and not isinstance(self.on_loop, str):
return self.on_loop(self)
return ""
def get_repeated_tool_info(self) -> str | None:
"""Get information about the repeated tool call, if any.
Returns:
A string describing the repeated tool and args, or None.
"""
if len(self._history) < self.repetition_threshold:
return None
counts: dict[ToolCallRecord, int] = {}
for record in self._history:
counts[record] = counts.get(record, 0) + 1
if counts[record] >= self.repetition_threshold:
return f"{record.tool_name}({record.tool_args})"
return None
def reset(self) -> None:
"""Clear the tool call history."""
self._history.clear()

View File

@@ -0,0 +1,48 @@
"""Events related to agent loop detection."""
from __future__ import annotations
from typing import Any
from pydantic import ConfigDict, model_validator
from crewai.events.base_events import BaseEvent
class LoopDetectedEvent(BaseEvent):
"""Event emitted when a repetitive loop pattern is detected in agent behavior.
Attributes:
agent_role: Role of the agent that is looping.
repeated_tool: Description of the repeated tool call (name + args).
action_taken: The action taken to break the loop.
iteration: Current iteration count when the loop was detected.
"""
agent_role: str
agent_id: str | None = None
task_id: str | None = None
repeated_tool: str | None = None
action_taken: str
iteration: int
agent: Any | None = None
type: str = "loop_detected"
model_config = ConfigDict(arbitrary_types_allowed=True)
@model_validator(mode="after")
def set_fingerprint_data(self) -> LoopDetectedEvent:
"""Set fingerprint data from the agent if available."""
if (
self.agent
and hasattr(self.agent, "fingerprint")
and self.agent.fingerprint
):
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self

View File

@@ -15,6 +15,7 @@
"native_tools": "",
"native_task": "\nCurrent Task: {input}",
"post_tool_reasoning": "Analyze the tool result. If requirements are met, provide the Final Answer. Otherwise, call the next tool. Deliver only the answer without meta-commentary.",
"loop_detected": "WARNING: You appear to be repeating the same action ({repeated_tool}). You have called it {count} times with the same arguments. This is not making progress. You MUST try a completely different approach — use a different tool, different arguments, or provide your Final Answer now.",
"format": "Decide if you need a tool or can provide the final answer. Use one at a time.\nTo use a tool, use:\nThought: [reasoning]\nAction: [name from {tool_names}]\nAction Input: [JSON object]\n\nTo provide the final answer, use:\nThought: [reasoning]\nFinal Answer: [complete response]",
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",

View File

@@ -0,0 +1,523 @@
"""Unit tests for LoopDetector and loop detection integration.
Tests cover:
- LoopDetector class: tracking, detection, message generation, configuration
- ToolCallRecord equality and hashing
- Edge cases: empty history, threshold boundaries, window overflow
- Integration with CrewAgentExecutor._record_and_check_loop
- LoopDetectedEvent creation
"""
from __future__ import annotations
from unittest.mock import MagicMock, Mock, patch
import pytest
from crewai.agents.loop_detector import LoopDetector, ToolCallRecord
from crewai.events.types.loop_events import LoopDetectedEvent
# ---------------------------------------------------------------------------
# ToolCallRecord
# ---------------------------------------------------------------------------
class TestToolCallRecord:
"""Tests for ToolCallRecord model."""
def test_equality_same_name_and_args(self):
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
b = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
assert a == b
def test_inequality_different_name(self):
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
b = ToolCallRecord(tool_name="fetch", tool_args='{"q": "hello"}')
assert a != b
def test_inequality_different_args(self):
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
b = ToolCallRecord(tool_name="search", tool_args='{"q": "world"}')
assert a != b
def test_hash_same_records(self):
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
b = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
assert hash(a) == hash(b)
def test_hash_different_records(self):
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
b = ToolCallRecord(tool_name="search", tool_args='{"q": "world"}')
# Different records should (almost certainly) have different hashes
assert hash(a) != hash(b)
def test_equality_not_implemented_for_other_types(self):
a = ToolCallRecord(tool_name="search", tool_args="{}")
assert a != "not a record"
assert a.__eq__("not a record") is NotImplemented
# ---------------------------------------------------------------------------
# LoopDetector — Initialization
# ---------------------------------------------------------------------------
class TestLoopDetectorInit:
"""Tests for LoopDetector initialization and defaults."""
def test_default_values(self):
ld = LoopDetector()
assert ld.window_size == 5
assert ld.repetition_threshold == 3
assert ld.on_loop == "inject_reflection"
def test_custom_values(self):
ld = LoopDetector(window_size=10, repetition_threshold=4, on_loop="stop")
assert ld.window_size == 10
assert ld.repetition_threshold == 4
assert ld.on_loop == "stop"
def test_callable_on_loop(self):
def my_cb(detector: LoopDetector) -> str:
return "custom"
ld = LoopDetector(on_loop=my_cb)
assert callable(ld.on_loop)
def test_window_size_minimum(self):
with pytest.raises(Exception):
LoopDetector(window_size=1)
def test_repetition_threshold_minimum(self):
with pytest.raises(Exception):
LoopDetector(repetition_threshold=1)
# ---------------------------------------------------------------------------
# LoopDetector — Argument Normalization
# ---------------------------------------------------------------------------
class TestNormalizeArgs:
"""Tests for LoopDetector._normalize_args static method."""
def test_dict_sorted_keys(self):
result = LoopDetector._normalize_args({"b": 2, "a": 1})
assert result == '{"a": 1, "b": 2}'
def test_json_string_normalized(self):
result = LoopDetector._normalize_args('{"b": 2, "a": 1}')
assert result == '{"a": 1, "b": 2}'
def test_plain_string_stripped(self):
result = LoopDetector._normalize_args(" hello world ")
assert result == "hello world"
def test_invalid_json_string(self):
result = LoopDetector._normalize_args("not json")
assert result == "not json"
def test_empty_dict(self):
result = LoopDetector._normalize_args({})
assert result == "{}"
def test_empty_string(self):
result = LoopDetector._normalize_args("")
assert result == ""
# ---------------------------------------------------------------------------
# LoopDetector — Recording and Detection
# ---------------------------------------------------------------------------
class TestLoopDetection:
"""Tests for loop recording and detection logic."""
def test_no_loop_below_threshold(self):
ld = LoopDetector(repetition_threshold=3)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
assert ld.is_loop_detected() is False
def test_loop_at_threshold(self):
ld = LoopDetector(repetition_threshold=3)
for _ in range(3):
ld.record_tool_call("search", {"q": "test"})
assert ld.is_loop_detected() is True
def test_loop_above_threshold(self):
ld = LoopDetector(repetition_threshold=3)
for _ in range(5):
ld.record_tool_call("search", {"q": "test"})
assert ld.is_loop_detected() is True
def test_no_loop_with_different_tools(self):
ld = LoopDetector(repetition_threshold=3)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("fetch", {"url": "http://example.com"})
ld.record_tool_call("read", {"file": "data.txt"})
assert ld.is_loop_detected() is False
def test_no_loop_with_different_args(self):
ld = LoopDetector(repetition_threshold=3)
ld.record_tool_call("search", {"q": "test1"})
ld.record_tool_call("search", {"q": "test2"})
ld.record_tool_call("search", {"q": "test3"})
assert ld.is_loop_detected() is False
def test_loop_with_mixed_calls(self):
"""Loop detected even if other calls are interspersed, as long as
threshold is met within the window."""
ld = LoopDetector(window_size=5, repetition_threshold=3)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("fetch", {"url": "http://example.com"})
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
assert ld.is_loop_detected() is True
def test_window_overflow_removes_old_calls(self):
"""Old calls slide out of the window, potentially clearing the loop."""
ld = LoopDetector(window_size=3, repetition_threshold=3)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
# Now add a different call — this pushes first "search" out when we add
# one more different call
ld.record_tool_call("fetch", {"url": "http://example.com"})
# Only 2 "search" in window at this point, not 3
assert ld.is_loop_detected() is False
def test_loop_with_string_args(self):
ld = LoopDetector(repetition_threshold=3)
for _ in range(3):
ld.record_tool_call("search", '{"q": "test"}')
assert ld.is_loop_detected() is True
def test_loop_with_equivalent_json_args(self):
"""Args with different key order should be normalized and treated as equal."""
ld = LoopDetector(repetition_threshold=3)
ld.record_tool_call("search", '{"b": 2, "a": 1}')
ld.record_tool_call("search", '{"a": 1, "b": 2}')
ld.record_tool_call("search", {"a": 1, "b": 2})
assert ld.is_loop_detected() is True
def test_empty_history(self):
ld = LoopDetector()
assert ld.is_loop_detected() is False
def test_single_call(self):
ld = LoopDetector()
ld.record_tool_call("search", {"q": "test"})
assert ld.is_loop_detected() is False
# ---------------------------------------------------------------------------
# LoopDetector — Reset
# ---------------------------------------------------------------------------
class TestLoopDetectorReset:
"""Tests for LoopDetector.reset()."""
def test_reset_clears_history(self):
ld = LoopDetector(repetition_threshold=3)
for _ in range(3):
ld.record_tool_call("search", {"q": "test"})
assert ld.is_loop_detected() is True
ld.reset()
assert ld.is_loop_detected() is False
def test_reset_allows_fresh_tracking(self):
ld = LoopDetector(repetition_threshold=3)
for _ in range(3):
ld.record_tool_call("search", {"q": "test"})
ld.reset()
# Now record again — should not trigger until threshold
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
assert ld.is_loop_detected() is False
# ---------------------------------------------------------------------------
# LoopDetector — Messages and Tool Info
# ---------------------------------------------------------------------------
class TestLoopDetectorMessages:
"""Tests for get_loop_message and get_repeated_tool_info."""
def test_get_loop_message_with_callable(self):
def my_callback(detector: LoopDetector) -> str:
return "Custom loop message"
ld = LoopDetector(on_loop=my_callback)
assert ld.get_loop_message() == "Custom loop message"
def test_get_loop_message_inject_reflection_returns_empty(self):
ld = LoopDetector(on_loop="inject_reflection")
assert ld.get_loop_message() == ""
def test_get_loop_message_stop_returns_empty(self):
ld = LoopDetector(on_loop="stop")
assert ld.get_loop_message() == ""
def test_get_repeated_tool_info_with_loop(self):
ld = LoopDetector(repetition_threshold=3)
for _ in range(3):
ld.record_tool_call("search", {"q": "test"})
info = ld.get_repeated_tool_info()
assert info is not None
assert "search" in info
def test_get_repeated_tool_info_no_loop(self):
ld = LoopDetector(repetition_threshold=3)
ld.record_tool_call("search", {"q": "test"})
assert ld.get_repeated_tool_info() is None
def test_get_repeated_tool_info_empty_history(self):
ld = LoopDetector()
assert ld.get_repeated_tool_info() is None
def test_callback_receives_detector_instance(self):
received = {}
def my_callback(detector: LoopDetector) -> str:
received["detector"] = detector
return "msg"
ld = LoopDetector(on_loop=my_callback)
ld.get_loop_message()
assert received["detector"] is ld
# ---------------------------------------------------------------------------
# LoopDetectedEvent
# ---------------------------------------------------------------------------
class TestLoopDetectedEvent:
"""Tests for the LoopDetectedEvent model."""
def test_event_creation(self):
event = LoopDetectedEvent(
agent_role="Researcher",
agent_id="agent-1",
task_id="task-1",
repeated_tool="search({\"q\": \"test\"})",
action_taken="inject_reflection",
iteration=5,
)
assert event.agent_role == "Researcher"
assert event.repeated_tool == 'search({"q": "test"})'
assert event.action_taken == "inject_reflection"
assert event.iteration == 5
assert event.type == "loop_detected"
def test_event_with_agent_fingerprint(self):
mock_agent = MagicMock()
mock_agent.fingerprint.uuid_str = "fp-123"
mock_agent.fingerprint.metadata = {"key": "value"}
event = LoopDetectedEvent(
agent_role="Researcher",
action_taken="stop",
iteration=3,
agent=mock_agent,
)
assert event.source_fingerprint == "fp-123"
assert event.fingerprint_metadata == {"key": "value"}
def test_event_without_agent(self):
event = LoopDetectedEvent(
agent_role="Researcher",
action_taken="inject_reflection",
iteration=1,
)
assert event.agent is None
assert event.agent_id is None
# ---------------------------------------------------------------------------
# Integration: CrewAgentExecutor._record_and_check_loop
# ---------------------------------------------------------------------------
class TestRecordAndCheckLoop:
"""Tests for the _record_and_check_loop helper on CrewAgentExecutor."""
@pytest.fixture
def _make_executor(self):
"""Factory that builds a minimal CrewAgentExecutor-like object.
We import the real class and patch its __init__ so we can
test _record_and_check_loop without setting up the full executor.
"""
from crewai.agents.crew_agent_executor import CrewAgentExecutor
def _factory(
loop_detector: LoopDetector | None = None,
verbose: bool = False,
) -> CrewAgentExecutor:
executor = object.__new__(CrewAgentExecutor)
# Minimal attributes required by _record_and_check_loop
executor.loop_detector = loop_detector
mock_agent = Mock()
mock_agent.role = "Test Agent"
mock_agent.id = "agent-1"
mock_agent.verbose = verbose
executor.agent = mock_agent
mock_task = Mock()
mock_task.id = "task-1"
executor.task = mock_task
executor.iterations = 0
executor.messages = []
mock_printer = Mock()
executor._printer = mock_printer
mock_i18n = Mock()
mock_i18n.slice.return_value = (
"WARNING: You appear to be repeating the same action ({repeated_tool}). "
"You have called it {count} times."
)
executor._i18n = mock_i18n
mock_llm = Mock()
executor.llm = mock_llm
executor.callbacks = []
return executor
return _factory
def test_no_loop_detector_returns_none(self, _make_executor):
executor = _make_executor(loop_detector=None)
result = executor._record_and_check_loop("search", {"q": "test"})
assert result is None
def test_no_loop_detected_returns_none(self, _make_executor):
ld = LoopDetector(repetition_threshold=3)
executor = _make_executor(loop_detector=ld)
# Only 1 call — no loop
result = executor._record_and_check_loop("search", {"q": "test"})
assert result is None
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
def test_inject_reflection_appends_message(self, mock_bus, _make_executor):
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
executor = _make_executor(loop_detector=ld)
# Record 2 calls before the 3rd (which triggers detection)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
result = executor._record_and_check_loop("search", {"q": "test"})
assert result is None # inject_reflection does NOT stop
assert len(executor.messages) == 1
assert "repeating" in executor.messages[0]["content"].lower()
# Event should have been emitted
mock_bus.emit.assert_called_once()
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
@patch("crewai.agents.crew_agent_executor.handle_max_iterations_exceeded")
def test_stop_returns_agent_finish(
self, mock_handle_max, mock_bus, _make_executor
):
from crewai.agents.parser import AgentFinish
mock_handle_max.return_value = AgentFinish(
thought="forced", output="Stopped", text="Stopped"
)
ld = LoopDetector(repetition_threshold=3, on_loop="stop")
executor = _make_executor(loop_detector=ld)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
result = executor._record_and_check_loop("search", {"q": "test"})
assert result is not None
assert isinstance(result, AgentFinish)
mock_handle_max.assert_called_once()
mock_bus.emit.assert_called_once()
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
def test_callable_on_loop_injects_callback_message(
self, mock_bus, _make_executor
):
def custom_callback(detector: LoopDetector) -> str:
return "CUSTOM: Stop repeating yourself!"
ld = LoopDetector(repetition_threshold=3, on_loop=custom_callback)
executor = _make_executor(loop_detector=ld)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
result = executor._record_and_check_loop("search", {"q": "test"})
assert result is None
assert len(executor.messages) == 1
assert "CUSTOM: Stop repeating yourself!" in executor.messages[0]["content"]
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
def test_detection_resets_after_intervention(self, mock_bus, _make_executor):
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
executor = _make_executor(loop_detector=ld)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
# 3rd call triggers loop
executor._record_and_check_loop("search", {"q": "test"})
# After intervention, detector should be reset
assert ld.is_loop_detected() is False
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
def test_verbose_prints_message(self, mock_bus, _make_executor):
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
executor = _make_executor(loop_detector=ld, verbose=True)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
executor._record_and_check_loop("search", {"q": "test"})
executor._printer.print.assert_called_once()
call_kwargs = executor._printer.print.call_args
assert "Loop detected" in call_kwargs.kwargs.get(
"content", call_kwargs[1].get("content", "")
)
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
def test_event_contains_correct_data(self, mock_bus, _make_executor):
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
executor = _make_executor(loop_detector=ld)
ld.record_tool_call("search", {"q": "test"})
ld.record_tool_call("search", {"q": "test"})
executor._record_and_check_loop("search", {"q": "test"})
event = mock_bus.emit.call_args[0][1]
assert isinstance(event, LoopDetectedEvent)
assert event.agent_role == "Test Agent"
assert event.action_taken == "inject_reflection"
assert event.iteration == 0 # executor.iterations was 0
# ---------------------------------------------------------------------------
# Public API import
# ---------------------------------------------------------------------------
class TestPublicImport:
"""Verify LoopDetector is importable from the public crewai package."""
def test_import_from_crewai(self):
from crewai import LoopDetector as LD
assert LD is LoopDetector
def test_in_all(self):
import crewai
assert "LoopDetector" in crewai.__all__