mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-10 13:58:13 +00:00
Compare commits
2 Commits
lg-mcp-err
...
devin/1772
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
420ed828b7 | ||
|
|
52d36ccaf9 |
@@ -4,6 +4,7 @@ import urllib.request
|
||||
import warnings
|
||||
|
||||
from crewai.agent.core import Agent
|
||||
from crewai.agents.loop_detector import LoopDetector
|
||||
from crewai.crew import Crew
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
from crewai.flow.flow import Flow
|
||||
@@ -99,6 +100,7 @@ __all__ = [
|
||||
"Flow",
|
||||
"Knowledge",
|
||||
"LLMGuardrail",
|
||||
"LoopDetector",
|
||||
"Memory",
|
||||
"Process",
|
||||
"Task",
|
||||
|
||||
@@ -22,6 +22,7 @@ from typing_extensions import Self
|
||||
from crewai.agent.internal.meta import AgentMeta
|
||||
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
|
||||
from crewai.agents.cache.cache_handler import CacheHandler
|
||||
from crewai.agents.loop_detector import LoopDetector
|
||||
from crewai.agents.tools_handler import ToolsHandler
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.knowledge_config import KnowledgeConfig
|
||||
@@ -213,6 +214,15 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
|
||||
"If not set, falls back to crew memory."
|
||||
),
|
||||
)
|
||||
loop_detector: LoopDetector | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Optional loop detection middleware. When set, monitors agent tool calls "
|
||||
"for repetitive patterns and intervenes to break loops. "
|
||||
"Pass a LoopDetector instance to configure window_size, "
|
||||
"repetition_threshold, and on_loop behavior."
|
||||
),
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
|
||||
@@ -17,6 +17,7 @@ from pydantic import BaseModel, GetCoreSchemaHandler, ValidationError
|
||||
from pydantic_core import CoreSchema, core_schema
|
||||
|
||||
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
|
||||
from crewai.agents.loop_detector import LoopDetector
|
||||
from crewai.agents.parser import (
|
||||
AgentAction,
|
||||
AgentFinish,
|
||||
@@ -156,6 +157,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self.messages: list[LLMMessage] = []
|
||||
self.iterations = 0
|
||||
self.log_error_after = 3
|
||||
self.loop_detector: LoopDetector | None = (
|
||||
agent.loop_detector if agent and hasattr(agent, "loop_detector") else None
|
||||
)
|
||||
if self.loop_detector:
|
||||
self.loop_detector.reset()
|
||||
self.before_llm_call_hooks: list[Callable[..., Any]] = []
|
||||
self.after_llm_call_hooks: list[Callable[..., Any]] = []
|
||||
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
|
||||
@@ -410,6 +416,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
)
|
||||
}
|
||||
|
||||
# Capture tool info before _handle_agent_action may
|
||||
# convert the answer to AgentFinish (result_as_answer).
|
||||
_tool_name = formatted_answer.tool
|
||||
_tool_input = formatted_answer.tool_input
|
||||
|
||||
tool_result = execute_tool_and_check_finality(
|
||||
agent_action=formatted_answer,
|
||||
fingerprint_context=fingerprint_context,
|
||||
@@ -427,6 +438,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
formatted_answer, tool_result
|
||||
)
|
||||
|
||||
# Check for repetitive loop patterns
|
||||
loop_result = self._record_and_check_loop(
|
||||
_tool_name, _tool_input
|
||||
)
|
||||
if loop_result is not None:
|
||||
formatted_answer = loop_result
|
||||
break
|
||||
|
||||
self._invoke_step_callback(formatted_answer) # type: ignore[arg-type]
|
||||
self._append_message(formatted_answer.text) # type: ignore[union-attr]
|
||||
|
||||
@@ -783,6 +802,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
if tool_finish:
|
||||
return tool_finish
|
||||
|
||||
# Check for loop after each parallel tool result
|
||||
loop_result = self._record_and_check_loop(
|
||||
execution_result["func_name"],
|
||||
execution_result.get("tool_args", ""),
|
||||
)
|
||||
if loop_result is not None:
|
||||
return loop_result
|
||||
|
||||
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
|
||||
reasoning_message: LLMMessage = {
|
||||
"role": "user",
|
||||
@@ -807,6 +834,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
if tool_finish:
|
||||
return tool_finish
|
||||
|
||||
# Check for loop after sequential tool execution
|
||||
loop_result = self._record_and_check_loop(func_name, func_args)
|
||||
if loop_result is not None:
|
||||
return loop_result
|
||||
|
||||
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
|
||||
reasoning_message = {
|
||||
"role": "user",
|
||||
@@ -1068,6 +1100,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
"result": result,
|
||||
"from_cache": from_cache,
|
||||
"original_tool": original_tool,
|
||||
"tool_args": func_args,
|
||||
}
|
||||
|
||||
def _append_tool_result_and_check_finality(
|
||||
@@ -1246,6 +1279,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
)
|
||||
}
|
||||
|
||||
# Capture tool info before _handle_agent_action may
|
||||
# convert the answer to AgentFinish (result_as_answer).
|
||||
_tool_name = formatted_answer.tool
|
||||
_tool_input = formatted_answer.tool_input
|
||||
|
||||
tool_result = await aexecute_tool_and_check_finality(
|
||||
agent_action=formatted_answer,
|
||||
fingerprint_context=fingerprint_context,
|
||||
@@ -1263,6 +1301,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
formatted_answer, tool_result
|
||||
)
|
||||
|
||||
# Check for repetitive loop patterns
|
||||
loop_result = self._record_and_check_loop(
|
||||
_tool_name, _tool_input
|
||||
)
|
||||
if loop_result is not None:
|
||||
formatted_answer = loop_result
|
||||
break
|
||||
|
||||
await self._ainvoke_step_callback(formatted_answer) # type: ignore[arg-type]
|
||||
self._append_message(formatted_answer.text) # type: ignore[union-attr]
|
||||
|
||||
@@ -1462,6 +1508,94 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self._show_logs(formatted_answer)
|
||||
return formatted_answer
|
||||
|
||||
def _record_and_check_loop(
|
||||
self, tool_name: str, tool_args: str | dict[str, Any]
|
||||
) -> AgentFinish | None:
|
||||
"""Record a tool call and check for repetitive loop patterns.
|
||||
|
||||
If a loop is detected, takes the configured action:
|
||||
- ``inject_reflection``: injects a reflection prompt into messages.
|
||||
- ``stop``: forces the agent to produce a final answer.
|
||||
- callable: calls the user's callback and injects the returned message.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool that was called.
|
||||
tool_args: Arguments passed to the tool.
|
||||
|
||||
Returns:
|
||||
``AgentFinish`` if the loop action is ``stop``,
|
||||
``None`` otherwise (including when reflection is injected).
|
||||
"""
|
||||
if not self.loop_detector:
|
||||
return None
|
||||
|
||||
self.loop_detector.record_tool_call(tool_name, tool_args)
|
||||
|
||||
if not self.loop_detector.is_loop_detected():
|
||||
return None
|
||||
|
||||
repeated_tool = self.loop_detector.get_repeated_tool_info() or tool_name
|
||||
action_taken = (
|
||||
self.loop_detector.on_loop
|
||||
if isinstance(self.loop_detector.on_loop, str)
|
||||
else "callback"
|
||||
)
|
||||
|
||||
# Emit loop detected event
|
||||
from crewai.events.types.loop_events import LoopDetectedEvent
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self.agent,
|
||||
LoopDetectedEvent(
|
||||
agent_role=self.agent.role if self.agent else "unknown",
|
||||
agent_id=str(self.agent.id) if self.agent else None,
|
||||
task_id=str(self.task.id) if self.task else None,
|
||||
repeated_tool=repeated_tool,
|
||||
action_taken=action_taken,
|
||||
iteration=self.iterations,
|
||||
agent=self.agent,
|
||||
),
|
||||
)
|
||||
|
||||
if self.agent and self.agent.verbose:
|
||||
self._printer.print(
|
||||
content=f"Loop detected: {repeated_tool} called repeatedly. Action: {action_taken}",
|
||||
color="bold_yellow",
|
||||
)
|
||||
|
||||
if self.loop_detector.on_loop == "stop":
|
||||
return handle_max_iterations_exceeded(
|
||||
None,
|
||||
printer=self._printer,
|
||||
i18n=self._i18n,
|
||||
messages=self.messages,
|
||||
llm=self.llm,
|
||||
callbacks=self.callbacks,
|
||||
verbose=self.agent.verbose if self.agent else False,
|
||||
)
|
||||
|
||||
# inject_reflection or callable
|
||||
if callable(self.loop_detector.on_loop) and not isinstance(
|
||||
self.loop_detector.on_loop, str
|
||||
):
|
||||
message_text = self.loop_detector.get_loop_message()
|
||||
else:
|
||||
# Use i18n reflection prompt
|
||||
count = self.loop_detector.repetition_threshold
|
||||
message_text = self._i18n.slice("loop_detected").format(
|
||||
repeated_tool=repeated_tool, count=count
|
||||
)
|
||||
|
||||
loop_message: LLMMessage = {
|
||||
"role": "user",
|
||||
"content": message_text,
|
||||
}
|
||||
self.messages.append(loop_message)
|
||||
|
||||
# Reset detector after intervention to give the agent a fresh window
|
||||
self.loop_detector.reset()
|
||||
return None
|
||||
|
||||
def _handle_agent_action(
|
||||
self, formatted_answer: AgentAction, tool_result: ToolResult
|
||||
) -> AgentAction | AgentFinish:
|
||||
|
||||
210
lib/crewai/src/crewai/agents/loop_detector.py
Normal file
210
lib/crewai/src/crewai/agents/loop_detector.py
Normal file
@@ -0,0 +1,210 @@
|
||||
"""Loop detection for agent execution.
|
||||
|
||||
Detects repetitive behavioral patterns in agent tool calls and provides
|
||||
configurable intervention strategies to break out of loops.
|
||||
|
||||
Example usage:
|
||||
|
||||
from crewai import Agent
|
||||
from crewai.agents.loop_detector import LoopDetector
|
||||
|
||||
# Using default settings (window_size=5, repetition_threshold=3, inject_reflection)
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Find novel insights",
|
||||
backstory="An experienced researcher",
|
||||
loop_detector=LoopDetector(),
|
||||
)
|
||||
|
||||
# Custom configuration
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Find novel insights",
|
||||
backstory="An experienced researcher",
|
||||
loop_detector=LoopDetector(
|
||||
window_size=10,
|
||||
repetition_threshold=4,
|
||||
on_loop="stop",
|
||||
),
|
||||
)
|
||||
|
||||
# With a custom callback
|
||||
def my_callback(detector: LoopDetector) -> str:
|
||||
return "You are repeating yourself. Try a completely different approach."
|
||||
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Find novel insights",
|
||||
backstory="An experienced researcher",
|
||||
loop_detector=LoopDetector(
|
||||
on_loop=my_callback,
|
||||
),
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import deque
|
||||
from collections.abc import Callable
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ToolCallRecord(BaseModel):
|
||||
"""Record of a single tool call for loop detection."""
|
||||
|
||||
tool_name: str
|
||||
tool_args: str # Normalized JSON string of arguments
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, ToolCallRecord):
|
||||
return NotImplemented
|
||||
return self.tool_name == other.tool_name and self.tool_args == other.tool_args
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash((self.tool_name, self.tool_args))
|
||||
|
||||
|
||||
class LoopDetector(BaseModel):
|
||||
"""Detects repetitive behavioral patterns in agent tool calls.
|
||||
|
||||
Monitors a sliding window of recent tool calls and flags when
|
||||
the same tool is called repeatedly with the same arguments,
|
||||
indicating the agent is stuck in a loop.
|
||||
|
||||
Args:
|
||||
window_size: Number of recent tool calls to track in the
|
||||
sliding window. Defaults to 5.
|
||||
repetition_threshold: How many identical tool calls within
|
||||
the window trigger loop detection. Defaults to 3.
|
||||
on_loop: Action to take when a loop is detected.
|
||||
- ``"inject_reflection"`` (default): Inject a meta-prompt
|
||||
asking the agent to try a different approach.
|
||||
- ``"stop"``: Force the agent to produce a final answer.
|
||||
- A callable ``(LoopDetector) -> str``: Custom callback
|
||||
that returns a message to inject into the conversation.
|
||||
|
||||
Example::
|
||||
|
||||
from crewai import Agent
|
||||
from crewai.agents.loop_detector import LoopDetector
|
||||
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Find novel insights about AI",
|
||||
backstory="Senior researcher",
|
||||
loop_detector=LoopDetector(
|
||||
window_size=5,
|
||||
repetition_threshold=3,
|
||||
on_loop="inject_reflection",
|
||||
),
|
||||
)
|
||||
"""
|
||||
|
||||
window_size: int = Field(
|
||||
default=5,
|
||||
ge=2,
|
||||
description="Number of recent tool calls to track in the sliding window.",
|
||||
)
|
||||
repetition_threshold: int = Field(
|
||||
default=3,
|
||||
ge=2,
|
||||
description="How many identical tool calls within the window trigger loop detection.",
|
||||
)
|
||||
on_loop: Literal["inject_reflection", "stop"] | Callable[..., str] = Field(
|
||||
default="inject_reflection",
|
||||
description=(
|
||||
"Action when loop is detected: 'inject_reflection' to inject a reflection prompt, "
|
||||
"'stop' to force final answer, or a callable(LoopDetector) -> str for custom handling."
|
||||
),
|
||||
)
|
||||
_history: deque[ToolCallRecord] = deque()
|
||||
|
||||
def model_post_init(self, __context: Any) -> None:
|
||||
"""Initialize the internal deque with the configured window size."""
|
||||
object.__setattr__(self, "_history", deque(maxlen=self.window_size))
|
||||
|
||||
@staticmethod
|
||||
def _normalize_args(args: str | dict[str, Any]) -> str:
|
||||
"""Normalize tool arguments to a canonical JSON string for comparison.
|
||||
|
||||
Args:
|
||||
args: Tool arguments as a string or dict.
|
||||
|
||||
Returns:
|
||||
A normalized JSON string with sorted keys.
|
||||
"""
|
||||
if isinstance(args, str):
|
||||
try:
|
||||
parsed = json.loads(args)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return args.strip()
|
||||
return json.dumps(parsed, sort_keys=True, default=str)
|
||||
return json.dumps(args, sort_keys=True, default=str)
|
||||
|
||||
def record_tool_call(self, tool_name: str, tool_args: str | dict[str, Any]) -> None:
|
||||
"""Record a tool call in the sliding window.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool that was called.
|
||||
tool_args: Arguments passed to the tool.
|
||||
"""
|
||||
record = ToolCallRecord(
|
||||
tool_name=tool_name,
|
||||
tool_args=self._normalize_args(tool_args),
|
||||
)
|
||||
self._history.append(record)
|
||||
|
||||
def is_loop_detected(self) -> bool:
|
||||
"""Check if a repetitive loop pattern is detected.
|
||||
|
||||
Returns:
|
||||
True if any single tool call (same name + same args)
|
||||
appears at least ``repetition_threshold`` times within
|
||||
the current window.
|
||||
"""
|
||||
if len(self._history) < self.repetition_threshold:
|
||||
return False
|
||||
|
||||
counts: dict[ToolCallRecord, int] = {}
|
||||
for record in self._history:
|
||||
counts[record] = counts.get(record, 0) + 1
|
||||
if counts[record] >= self.repetition_threshold:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_loop_message(self) -> str:
|
||||
"""Get the intervention message based on the configured ``on_loop`` action.
|
||||
|
||||
Returns:
|
||||
A string message to inject into the agent conversation.
|
||||
"""
|
||||
if callable(self.on_loop) and not isinstance(self.on_loop, str):
|
||||
return self.on_loop(self)
|
||||
return ""
|
||||
|
||||
def get_repeated_tool_info(self) -> str | None:
|
||||
"""Get information about the repeated tool call, if any.
|
||||
|
||||
Returns:
|
||||
A string describing the repeated tool and args, or None.
|
||||
"""
|
||||
if len(self._history) < self.repetition_threshold:
|
||||
return None
|
||||
|
||||
counts: dict[ToolCallRecord, int] = {}
|
||||
for record in self._history:
|
||||
counts[record] = counts.get(record, 0) + 1
|
||||
if counts[record] >= self.repetition_threshold:
|
||||
return f"{record.tool_name}({record.tool_args})"
|
||||
return None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Clear the tool call history."""
|
||||
self._history.clear()
|
||||
48
lib/crewai/src/crewai/events/types/loop_events.py
Normal file
48
lib/crewai/src/crewai/events/types/loop_events.py
Normal file
@@ -0,0 +1,48 @@
|
||||
"""Events related to agent loop detection."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import ConfigDict, model_validator
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
|
||||
|
||||
class LoopDetectedEvent(BaseEvent):
|
||||
"""Event emitted when a repetitive loop pattern is detected in agent behavior.
|
||||
|
||||
Attributes:
|
||||
agent_role: Role of the agent that is looping.
|
||||
repeated_tool: Description of the repeated tool call (name + args).
|
||||
action_taken: The action taken to break the loop.
|
||||
iteration: Current iteration count when the loop was detected.
|
||||
"""
|
||||
|
||||
agent_role: str
|
||||
agent_id: str | None = None
|
||||
task_id: str | None = None
|
||||
repeated_tool: str | None = None
|
||||
action_taken: str
|
||||
iteration: int
|
||||
agent: Any | None = None
|
||||
type: str = "loop_detected"
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def set_fingerprint_data(self) -> LoopDetectedEvent:
|
||||
"""Set fingerprint data from the agent if available."""
|
||||
if (
|
||||
self.agent
|
||||
and hasattr(self.agent, "fingerprint")
|
||||
and self.agent.fingerprint
|
||||
):
|
||||
self.source_fingerprint = self.agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.agent.fingerprint, "metadata")
|
||||
and self.agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.agent.fingerprint.metadata
|
||||
return self
|
||||
@@ -15,6 +15,7 @@
|
||||
"native_tools": "",
|
||||
"native_task": "\nCurrent Task: {input}",
|
||||
"post_tool_reasoning": "Analyze the tool result. If requirements are met, provide the Final Answer. Otherwise, call the next tool. Deliver only the answer without meta-commentary.",
|
||||
"loop_detected": "WARNING: You appear to be repeating the same action ({repeated_tool}). You have called it {count} times with the same arguments. This is not making progress. You MUST try a completely different approach — use a different tool, different arguments, or provide your Final Answer now.",
|
||||
"format": "Decide if you need a tool or can provide the final answer. Use one at a time.\nTo use a tool, use:\nThought: [reasoning]\nAction: [name from {tool_names}]\nAction Input: [JSON object]\n\nTo provide the final answer, use:\nThought: [reasoning]\nFinal Answer: [complete response]",
|
||||
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",
|
||||
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",
|
||||
|
||||
523
lib/crewai/tests/agents/test_loop_detector.py
Normal file
523
lib/crewai/tests/agents/test_loop_detector.py
Normal file
@@ -0,0 +1,523 @@
|
||||
"""Unit tests for LoopDetector and loop detection integration.
|
||||
|
||||
Tests cover:
|
||||
- LoopDetector class: tracking, detection, message generation, configuration
|
||||
- ToolCallRecord equality and hashing
|
||||
- Edge cases: empty history, threshold boundaries, window overflow
|
||||
- Integration with CrewAgentExecutor._record_and_check_loop
|
||||
- LoopDetectedEvent creation
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agents.loop_detector import LoopDetector, ToolCallRecord
|
||||
from crewai.events.types.loop_events import LoopDetectedEvent
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ToolCallRecord
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestToolCallRecord:
|
||||
"""Tests for ToolCallRecord model."""
|
||||
|
||||
def test_equality_same_name_and_args(self):
|
||||
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
|
||||
b = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
|
||||
assert a == b
|
||||
|
||||
def test_inequality_different_name(self):
|
||||
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
|
||||
b = ToolCallRecord(tool_name="fetch", tool_args='{"q": "hello"}')
|
||||
assert a != b
|
||||
|
||||
def test_inequality_different_args(self):
|
||||
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
|
||||
b = ToolCallRecord(tool_name="search", tool_args='{"q": "world"}')
|
||||
assert a != b
|
||||
|
||||
def test_hash_same_records(self):
|
||||
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
|
||||
b = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
|
||||
assert hash(a) == hash(b)
|
||||
|
||||
def test_hash_different_records(self):
|
||||
a = ToolCallRecord(tool_name="search", tool_args='{"q": "hello"}')
|
||||
b = ToolCallRecord(tool_name="search", tool_args='{"q": "world"}')
|
||||
# Different records should (almost certainly) have different hashes
|
||||
assert hash(a) != hash(b)
|
||||
|
||||
def test_equality_not_implemented_for_other_types(self):
|
||||
a = ToolCallRecord(tool_name="search", tool_args="{}")
|
||||
assert a != "not a record"
|
||||
assert a.__eq__("not a record") is NotImplemented
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LoopDetector — Initialization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoopDetectorInit:
|
||||
"""Tests for LoopDetector initialization and defaults."""
|
||||
|
||||
def test_default_values(self):
|
||||
ld = LoopDetector()
|
||||
assert ld.window_size == 5
|
||||
assert ld.repetition_threshold == 3
|
||||
assert ld.on_loop == "inject_reflection"
|
||||
|
||||
def test_custom_values(self):
|
||||
ld = LoopDetector(window_size=10, repetition_threshold=4, on_loop="stop")
|
||||
assert ld.window_size == 10
|
||||
assert ld.repetition_threshold == 4
|
||||
assert ld.on_loop == "stop"
|
||||
|
||||
def test_callable_on_loop(self):
|
||||
def my_cb(detector: LoopDetector) -> str:
|
||||
return "custom"
|
||||
|
||||
ld = LoopDetector(on_loop=my_cb)
|
||||
assert callable(ld.on_loop)
|
||||
|
||||
def test_window_size_minimum(self):
|
||||
with pytest.raises(Exception):
|
||||
LoopDetector(window_size=1)
|
||||
|
||||
def test_repetition_threshold_minimum(self):
|
||||
with pytest.raises(Exception):
|
||||
LoopDetector(repetition_threshold=1)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LoopDetector — Argument Normalization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestNormalizeArgs:
|
||||
"""Tests for LoopDetector._normalize_args static method."""
|
||||
|
||||
def test_dict_sorted_keys(self):
|
||||
result = LoopDetector._normalize_args({"b": 2, "a": 1})
|
||||
assert result == '{"a": 1, "b": 2}'
|
||||
|
||||
def test_json_string_normalized(self):
|
||||
result = LoopDetector._normalize_args('{"b": 2, "a": 1}')
|
||||
assert result == '{"a": 1, "b": 2}'
|
||||
|
||||
def test_plain_string_stripped(self):
|
||||
result = LoopDetector._normalize_args(" hello world ")
|
||||
assert result == "hello world"
|
||||
|
||||
def test_invalid_json_string(self):
|
||||
result = LoopDetector._normalize_args("not json")
|
||||
assert result == "not json"
|
||||
|
||||
def test_empty_dict(self):
|
||||
result = LoopDetector._normalize_args({})
|
||||
assert result == "{}"
|
||||
|
||||
def test_empty_string(self):
|
||||
result = LoopDetector._normalize_args("")
|
||||
assert result == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LoopDetector — Recording and Detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoopDetection:
|
||||
"""Tests for loop recording and detection logic."""
|
||||
|
||||
def test_no_loop_below_threshold(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
def test_loop_at_threshold(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
for _ in range(3):
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.is_loop_detected() is True
|
||||
|
||||
def test_loop_above_threshold(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
for _ in range(5):
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.is_loop_detected() is True
|
||||
|
||||
def test_no_loop_with_different_tools(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("fetch", {"url": "http://example.com"})
|
||||
ld.record_tool_call("read", {"file": "data.txt"})
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
def test_no_loop_with_different_args(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
ld.record_tool_call("search", {"q": "test1"})
|
||||
ld.record_tool_call("search", {"q": "test2"})
|
||||
ld.record_tool_call("search", {"q": "test3"})
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
def test_loop_with_mixed_calls(self):
|
||||
"""Loop detected even if other calls are interspersed, as long as
|
||||
threshold is met within the window."""
|
||||
ld = LoopDetector(window_size=5, repetition_threshold=3)
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("fetch", {"url": "http://example.com"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.is_loop_detected() is True
|
||||
|
||||
def test_window_overflow_removes_old_calls(self):
|
||||
"""Old calls slide out of the window, potentially clearing the loop."""
|
||||
ld = LoopDetector(window_size=3, repetition_threshold=3)
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
# Now add a different call — this pushes first "search" out when we add
|
||||
# one more different call
|
||||
ld.record_tool_call("fetch", {"url": "http://example.com"})
|
||||
# Only 2 "search" in window at this point, not 3
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
def test_loop_with_string_args(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
for _ in range(3):
|
||||
ld.record_tool_call("search", '{"q": "test"}')
|
||||
assert ld.is_loop_detected() is True
|
||||
|
||||
def test_loop_with_equivalent_json_args(self):
|
||||
"""Args with different key order should be normalized and treated as equal."""
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
ld.record_tool_call("search", '{"b": 2, "a": 1}')
|
||||
ld.record_tool_call("search", '{"a": 1, "b": 2}')
|
||||
ld.record_tool_call("search", {"a": 1, "b": 2})
|
||||
assert ld.is_loop_detected() is True
|
||||
|
||||
def test_empty_history(self):
|
||||
ld = LoopDetector()
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
def test_single_call(self):
|
||||
ld = LoopDetector()
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LoopDetector — Reset
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoopDetectorReset:
|
||||
"""Tests for LoopDetector.reset()."""
|
||||
|
||||
def test_reset_clears_history(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
for _ in range(3):
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.is_loop_detected() is True
|
||||
ld.reset()
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
def test_reset_allows_fresh_tracking(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
for _ in range(3):
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.reset()
|
||||
# Now record again — should not trigger until threshold
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LoopDetector — Messages and Tool Info
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoopDetectorMessages:
|
||||
"""Tests for get_loop_message and get_repeated_tool_info."""
|
||||
|
||||
def test_get_loop_message_with_callable(self):
|
||||
def my_callback(detector: LoopDetector) -> str:
|
||||
return "Custom loop message"
|
||||
|
||||
ld = LoopDetector(on_loop=my_callback)
|
||||
assert ld.get_loop_message() == "Custom loop message"
|
||||
|
||||
def test_get_loop_message_inject_reflection_returns_empty(self):
|
||||
ld = LoopDetector(on_loop="inject_reflection")
|
||||
assert ld.get_loop_message() == ""
|
||||
|
||||
def test_get_loop_message_stop_returns_empty(self):
|
||||
ld = LoopDetector(on_loop="stop")
|
||||
assert ld.get_loop_message() == ""
|
||||
|
||||
def test_get_repeated_tool_info_with_loop(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
for _ in range(3):
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
info = ld.get_repeated_tool_info()
|
||||
assert info is not None
|
||||
assert "search" in info
|
||||
|
||||
def test_get_repeated_tool_info_no_loop(self):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
assert ld.get_repeated_tool_info() is None
|
||||
|
||||
def test_get_repeated_tool_info_empty_history(self):
|
||||
ld = LoopDetector()
|
||||
assert ld.get_repeated_tool_info() is None
|
||||
|
||||
def test_callback_receives_detector_instance(self):
|
||||
received = {}
|
||||
|
||||
def my_callback(detector: LoopDetector) -> str:
|
||||
received["detector"] = detector
|
||||
return "msg"
|
||||
|
||||
ld = LoopDetector(on_loop=my_callback)
|
||||
ld.get_loop_message()
|
||||
assert received["detector"] is ld
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LoopDetectedEvent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoopDetectedEvent:
|
||||
"""Tests for the LoopDetectedEvent model."""
|
||||
|
||||
def test_event_creation(self):
|
||||
event = LoopDetectedEvent(
|
||||
agent_role="Researcher",
|
||||
agent_id="agent-1",
|
||||
task_id="task-1",
|
||||
repeated_tool="search({\"q\": \"test\"})",
|
||||
action_taken="inject_reflection",
|
||||
iteration=5,
|
||||
)
|
||||
assert event.agent_role == "Researcher"
|
||||
assert event.repeated_tool == 'search({"q": "test"})'
|
||||
assert event.action_taken == "inject_reflection"
|
||||
assert event.iteration == 5
|
||||
assert event.type == "loop_detected"
|
||||
|
||||
def test_event_with_agent_fingerprint(self):
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.fingerprint.uuid_str = "fp-123"
|
||||
mock_agent.fingerprint.metadata = {"key": "value"}
|
||||
|
||||
event = LoopDetectedEvent(
|
||||
agent_role="Researcher",
|
||||
action_taken="stop",
|
||||
iteration=3,
|
||||
agent=mock_agent,
|
||||
)
|
||||
assert event.source_fingerprint == "fp-123"
|
||||
assert event.fingerprint_metadata == {"key": "value"}
|
||||
|
||||
def test_event_without_agent(self):
|
||||
event = LoopDetectedEvent(
|
||||
agent_role="Researcher",
|
||||
action_taken="inject_reflection",
|
||||
iteration=1,
|
||||
)
|
||||
assert event.agent is None
|
||||
assert event.agent_id is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration: CrewAgentExecutor._record_and_check_loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRecordAndCheckLoop:
|
||||
"""Tests for the _record_and_check_loop helper on CrewAgentExecutor."""
|
||||
|
||||
@pytest.fixture
|
||||
def _make_executor(self):
|
||||
"""Factory that builds a minimal CrewAgentExecutor-like object.
|
||||
|
||||
We import the real class and patch its __init__ so we can
|
||||
test _record_and_check_loop without setting up the full executor.
|
||||
"""
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
|
||||
def _factory(
|
||||
loop_detector: LoopDetector | None = None,
|
||||
verbose: bool = False,
|
||||
) -> CrewAgentExecutor:
|
||||
executor = object.__new__(CrewAgentExecutor)
|
||||
# Minimal attributes required by _record_and_check_loop
|
||||
executor.loop_detector = loop_detector
|
||||
|
||||
mock_agent = Mock()
|
||||
mock_agent.role = "Test Agent"
|
||||
mock_agent.id = "agent-1"
|
||||
mock_agent.verbose = verbose
|
||||
executor.agent = mock_agent
|
||||
|
||||
mock_task = Mock()
|
||||
mock_task.id = "task-1"
|
||||
executor.task = mock_task
|
||||
|
||||
executor.iterations = 0
|
||||
executor.messages = []
|
||||
|
||||
mock_printer = Mock()
|
||||
executor._printer = mock_printer
|
||||
|
||||
mock_i18n = Mock()
|
||||
mock_i18n.slice.return_value = (
|
||||
"WARNING: You appear to be repeating the same action ({repeated_tool}). "
|
||||
"You have called it {count} times."
|
||||
)
|
||||
executor._i18n = mock_i18n
|
||||
|
||||
mock_llm = Mock()
|
||||
executor.llm = mock_llm
|
||||
executor.callbacks = []
|
||||
|
||||
return executor
|
||||
|
||||
return _factory
|
||||
|
||||
def test_no_loop_detector_returns_none(self, _make_executor):
|
||||
executor = _make_executor(loop_detector=None)
|
||||
result = executor._record_and_check_loop("search", {"q": "test"})
|
||||
assert result is None
|
||||
|
||||
def test_no_loop_detected_returns_none(self, _make_executor):
|
||||
ld = LoopDetector(repetition_threshold=3)
|
||||
executor = _make_executor(loop_detector=ld)
|
||||
# Only 1 call — no loop
|
||||
result = executor._record_and_check_loop("search", {"q": "test"})
|
||||
assert result is None
|
||||
|
||||
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
|
||||
def test_inject_reflection_appends_message(self, mock_bus, _make_executor):
|
||||
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
|
||||
executor = _make_executor(loop_detector=ld)
|
||||
|
||||
# Record 2 calls before the 3rd (which triggers detection)
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
|
||||
result = executor._record_and_check_loop("search", {"q": "test"})
|
||||
|
||||
assert result is None # inject_reflection does NOT stop
|
||||
assert len(executor.messages) == 1
|
||||
assert "repeating" in executor.messages[0]["content"].lower()
|
||||
# Event should have been emitted
|
||||
mock_bus.emit.assert_called_once()
|
||||
|
||||
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
|
||||
@patch("crewai.agents.crew_agent_executor.handle_max_iterations_exceeded")
|
||||
def test_stop_returns_agent_finish(
|
||||
self, mock_handle_max, mock_bus, _make_executor
|
||||
):
|
||||
from crewai.agents.parser import AgentFinish
|
||||
|
||||
mock_handle_max.return_value = AgentFinish(
|
||||
thought="forced", output="Stopped", text="Stopped"
|
||||
)
|
||||
|
||||
ld = LoopDetector(repetition_threshold=3, on_loop="stop")
|
||||
executor = _make_executor(loop_detector=ld)
|
||||
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
|
||||
result = executor._record_and_check_loop("search", {"q": "test"})
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, AgentFinish)
|
||||
mock_handle_max.assert_called_once()
|
||||
mock_bus.emit.assert_called_once()
|
||||
|
||||
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
|
||||
def test_callable_on_loop_injects_callback_message(
|
||||
self, mock_bus, _make_executor
|
||||
):
|
||||
def custom_callback(detector: LoopDetector) -> str:
|
||||
return "CUSTOM: Stop repeating yourself!"
|
||||
|
||||
ld = LoopDetector(repetition_threshold=3, on_loop=custom_callback)
|
||||
executor = _make_executor(loop_detector=ld)
|
||||
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
|
||||
result = executor._record_and_check_loop("search", {"q": "test"})
|
||||
|
||||
assert result is None
|
||||
assert len(executor.messages) == 1
|
||||
assert "CUSTOM: Stop repeating yourself!" in executor.messages[0]["content"]
|
||||
|
||||
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
|
||||
def test_detection_resets_after_intervention(self, mock_bus, _make_executor):
|
||||
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
|
||||
executor = _make_executor(loop_detector=ld)
|
||||
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
|
||||
# 3rd call triggers loop
|
||||
executor._record_and_check_loop("search", {"q": "test"})
|
||||
|
||||
# After intervention, detector should be reset
|
||||
assert ld.is_loop_detected() is False
|
||||
|
||||
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
|
||||
def test_verbose_prints_message(self, mock_bus, _make_executor):
|
||||
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
|
||||
executor = _make_executor(loop_detector=ld, verbose=True)
|
||||
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
|
||||
executor._record_and_check_loop("search", {"q": "test"})
|
||||
|
||||
executor._printer.print.assert_called_once()
|
||||
call_kwargs = executor._printer.print.call_args
|
||||
assert "Loop detected" in call_kwargs.kwargs.get(
|
||||
"content", call_kwargs[1].get("content", "")
|
||||
)
|
||||
|
||||
@patch("crewai.agents.crew_agent_executor.crewai_event_bus")
|
||||
def test_event_contains_correct_data(self, mock_bus, _make_executor):
|
||||
ld = LoopDetector(repetition_threshold=3, on_loop="inject_reflection")
|
||||
executor = _make_executor(loop_detector=ld)
|
||||
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
ld.record_tool_call("search", {"q": "test"})
|
||||
|
||||
executor._record_and_check_loop("search", {"q": "test"})
|
||||
|
||||
event = mock_bus.emit.call_args[0][1]
|
||||
assert isinstance(event, LoopDetectedEvent)
|
||||
assert event.agent_role == "Test Agent"
|
||||
assert event.action_taken == "inject_reflection"
|
||||
assert event.iteration == 0 # executor.iterations was 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API import
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPublicImport:
|
||||
"""Verify LoopDetector is importable from the public crewai package."""
|
||||
|
||||
def test_import_from_crewai(self):
|
||||
from crewai import LoopDetector as LD
|
||||
|
||||
assert LD is LoopDetector
|
||||
|
||||
def test_in_all(self):
|
||||
import crewai
|
||||
|
||||
assert "LoopDetector" in crewai.__all__
|
||||
Reference in New Issue
Block a user