This commit is contained in:
João Moura
2025-06-02 10:18:50 -07:00
parent efebcd9734
commit e4e9bf343a
8 changed files with 979 additions and 836 deletions

View File

@@ -1,176 +1,386 @@
"""Agent state management for long-running tasks."""
"""Agent state management for long-running tasks with focus on progress tracking."""
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union, Set
from pydantic import BaseModel, Field
from datetime import datetime
import json
class ToolUsage(BaseModel):
"""Record of a single tool usage."""
tool_name: str = Field(description="Name of the tool used")
arguments: Dict[str, Any] = Field(description="Arguments passed to the tool (may be truncated)")
result_summary: Optional[str] = Field(default=None, description="Brief summary of the tool's result")
timestamp: datetime = Field(default_factory=datetime.now, description="When the tool was used")
step_number: int = Field(description="Which execution step this tool was used in")
class CriterionProgress(BaseModel):
"""Progress tracking for a single acceptance criterion."""
criterion: str = Field(description="The acceptance criterion")
status: str = Field(default="not_started", description="Status: not_started, in_progress, completed")
progress_notes: str = Field(default="", description="Specific progress made towards this criterion")
completion_percentage: int = Field(default=0, description="Estimated completion percentage (0-100)")
remaining_work: str = Field(default="", description="What still needs to be done for this criterion")
# Enhanced tracking
processed_items: Set[str] = Field(default_factory=set, description="IDs or identifiers of processed items")
total_items_expected: Optional[int] = Field(default=None, description="Total number of items expected (if known)")
items_to_process: List[str] = Field(default_factory=list, description="Queue of specific items to process next")
last_updated: datetime = Field(default_factory=datetime.now)
class ProgressLog(BaseModel):
"""Single log entry for progress tracking."""
timestamp: datetime = Field(default_factory=datetime.now)
action: str = Field(description="What action was taken")
result: str = Field(description="Result or outcome of the action")
items_processed: List[str] = Field(default_factory=list, description="Items processed in this action")
criterion: Optional[str] = Field(default=None, description="Related acceptance criterion")
class AgentState(BaseModel):
"""Persistent state object for agent task execution.
"""Enhanced state management with deterministic progress tracking.
This state object helps agents maintain coherence during long-running tasks
by tracking plans, progress, and intermediate results without relying solely
on conversation history.
This state helps agents maintain focus during long executions by tracking
specific progress against each acceptance criterion with detailed logging.
"""
# Core fields
original_plan: List[str] = Field(
# Core planning elements
plan: List[str] = Field(
default_factory=list,
description="The initial plan from first reasoning pass. Never overwrite unless user requests complete replan"
description="The current plan steps"
)
acceptance_criteria: List[str] = Field(
default_factory=list,
description="Concrete goals to satisfy for task completion"
description="Concrete criteria that must be met for task completion"
)
# Progress tracking
criteria_progress: Dict[str, CriterionProgress] = Field(
default_factory=dict,
description="Detailed progress for each acceptance criterion"
)
# Data storage
scratchpad: Dict[str, Any] = Field(
default_factory=dict,
description="Agent-defined storage for intermediate results and metadata"
description="Storage for intermediate results and data"
)
tool_usage_history: List[ToolUsage] = Field(
# Simple tracking
current_focus: str = Field(
default="",
description="What the agent should be focusing on right now"
)
next_steps: List[str] = Field(
default_factory=list,
description="Detailed history of tool usage including arguments and results"
description="Immediate next steps to take"
)
# Additional tracking fields
task_id: Optional[str] = Field(
default=None,
description="ID of the current task being executed"
)
created_at: datetime = Field(
default_factory=datetime.now,
description="When this state was created"
)
last_updated: datetime = Field(
default_factory=datetime.now,
description="When this state was last modified"
)
steps_completed: int = Field(
overall_progress: int = Field(
default=0,
description="Number of execution steps completed"
description="Overall task completion percentage (0-100)"
)
def set_original_plan(self, plan: List[str]) -> None:
"""Set the original plan (only if not already set)."""
if not self.original_plan:
self.original_plan = plan
self.last_updated = datetime.now()
# Enhanced tracking
progress_logs: List[ProgressLog] = Field(
default_factory=list,
description="Detailed log of all progress made"
)
work_queue: List[Dict[str, Any]] = Field(
default_factory=list,
description="Queue of specific work items to process"
)
# Metadata tracking
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Additional metadata for tracking (e.g., total count expectations)"
)
def initialize_criteria_progress(self) -> None:
"""Initialize progress tracking for all acceptance criteria."""
for criterion in self.acceptance_criteria:
if criterion not in self.criteria_progress:
self.criteria_progress[criterion] = CriterionProgress(criterion=criterion)
def update_criterion_progress(
self,
criterion: str,
status: str,
progress_notes: str,
completion_percentage: int,
remaining_work: str,
processed_items: Optional[List[str]] = None,
items_to_process: Optional[List[str]] = None,
total_items_expected: Optional[int] = None
) -> None:
"""Update progress for a specific criterion with enhanced tracking."""
if criterion in self.criteria_progress:
progress = self.criteria_progress[criterion]
progress.status = status
progress.progress_notes = progress_notes
progress.completion_percentage = max(0, min(100, completion_percentage))
progress.remaining_work = remaining_work
progress.last_updated = datetime.now()
# Update processed items
if processed_items:
progress.processed_items.update(processed_items)
# Update items to process queue
if items_to_process is not None:
progress.items_to_process = items_to_process
# Update total expected if provided
if total_items_expected is not None:
progress.total_items_expected = total_items_expected
# Recalculate completion percentage based on actual items if possible
if progress.total_items_expected and progress.total_items_expected > 0:
actual_percentage = int((len(progress.processed_items) / progress.total_items_expected) * 100)
progress.completion_percentage = actual_percentage
# Update overall progress
self._recalculate_overall_progress()
def _recalculate_overall_progress(self) -> None:
"""Recalculate overall progress based on all criteria."""
if not self.criteria_progress:
self.overall_progress = 0
return
total_progress = sum(p.completion_percentage for p in self.criteria_progress.values())
self.overall_progress = int(total_progress / len(self.criteria_progress))
def add_to_scratchpad(self, key: str, value: Any) -> None:
"""Add or update a value in the scratchpad."""
self.scratchpad[key] = value
self.last_updated = datetime.now()
def record_tool_usage(
self,
tool_name: str,
arguments: Dict[str, Any],
result_summary: Optional[str] = None,
max_arg_length: int = 200
) -> None:
"""Record a tool usage with truncated arguments.
# Analyze the data for item tracking
self._analyze_scratchpad_for_items(key, value)
Args:
tool_name: Name of the tool used
arguments: Arguments passed to the tool
result_summary: Optional brief summary of the result
max_arg_length: Maximum length for string arguments before truncation
"""
# Truncate long string arguments to prevent state bloat
truncated_args = {}
for key, value in arguments.items():
if isinstance(value, str) and len(value) > max_arg_length:
truncated_args[key] = value[:max_arg_length] + "..."
elif isinstance(value, (list, dict)):
# For complex types, store a summary
truncated_args[key] = f"<{type(value).__name__} with {len(value)} items>"
else:
truncated_args[key] = value
def _analyze_scratchpad_for_items(self, key: str, value: Any) -> None:
"""Analyze scratchpad data to extract trackable items."""
# If it's a list, try to extract IDs
if isinstance(value, list) and value:
item_ids = []
for item in value:
if isinstance(item, dict):
# Look for common ID fields
for id_field in ['id', 'ID', 'uid', 'uuid', 'message_id', 'email_id']:
if id_field in item:
item_ids.append(str(item[id_field]))
break
tool_usage = ToolUsage(
tool_name=tool_name,
arguments=truncated_args,
result_summary=result_summary,
step_number=self.steps_completed
if item_ids:
# Store metadata about this list
self.metadata[f"{key}_ids"] = item_ids
self.metadata[f"{key}_count"] = len(value)
def log_progress(self, action: str, result: str, items_processed: Optional[List[str]] = None, criterion: Optional[str] = None) -> None:
"""Add a progress log entry."""
log_entry = ProgressLog(
action=action,
result=result,
items_processed=items_processed or [],
criterion=criterion
)
self.progress_logs.append(log_entry)
self.tool_usage_history.append(tool_usage)
self.last_updated = datetime.now()
def add_to_work_queue(self, work_item: Dict[str, Any]) -> None:
"""Add an item to the work queue."""
self.work_queue.append(work_item)
def increment_steps(self) -> None:
"""Increment the step counter."""
self.steps_completed += 1
self.last_updated = datetime.now()
def get_next_work_item(self) -> Optional[Dict[str, Any]]:
"""Get and remove the next item from the work queue."""
if self.work_queue:
return self.work_queue.pop(0)
return None
def reset(self, task_id: Optional[str] = None) -> None:
"""Reset state for a new task."""
self.original_plan = []
self.acceptance_criteria = []
self.scratchpad = {}
self.tool_usage_history = []
self.task_id = task_id
self.created_at = datetime.now()
self.last_updated = datetime.now()
self.steps_completed = 0
def set_focus_and_next_steps(self, focus: str, next_steps: List[str]) -> None:
"""Update current focus and next steps."""
self.current_focus = focus
self.next_steps = next_steps
def to_context_string(self) -> str:
"""Generate a concise string representation for LLM context."""
context = f"Current State (Step {self.steps_completed}):\n"
context += f"- Task ID: {self.task_id}\n"
def get_progress_context(self) -> str:
"""Generate a focused progress update for the agent."""
context = f"📊 PROGRESS UPDATE (Overall: {self.overall_progress}%)\n"
context += "="*50 + "\n\n"
if self.acceptance_criteria:
context += "- Acceptance Criteria:\n"
for criterion in self.acceptance_criteria:
context += f"{criterion}\n"
# Current focus
if self.current_focus:
context += f"🎯 CURRENT FOCUS: {self.current_focus}\n\n"
if self.original_plan:
context += "- Plan:\n"
for i, step in enumerate(self.original_plan, 1):
context += f" {i}. {step}\n"
# Progress on each criterion with detailed tracking
if self.criteria_progress:
context += "📋 ACCEPTANCE CRITERIA PROGRESS:\n"
for criterion, progress in self.criteria_progress.items():
status_emoji = "" if progress.status == "completed" else "🔄" if progress.status == "in_progress" else "⏸️"
context += f"\n{status_emoji} {criterion}\n"
if self.tool_usage_history:
context += "- Recent Tool Usage:\n"
# Show last 5 tool uses
recent_tools = self.tool_usage_history[-5:]
for usage in recent_tools:
context += f" • Step {usage.step_number}: {usage.tool_name}"
if usage.arguments:
args_preview = ", ".join(f"{k}={v}" for k, v in list(usage.arguments.items())[:2])
context += f"({args_preview})"
# Show detailed progress
if progress.total_items_expected:
context += f" Progress: {len(progress.processed_items)}/{progress.total_items_expected} items ({progress.completion_percentage}%)\n"
else:
context += f" Progress: {progress.completion_percentage}%"
if progress.processed_items:
context += f" - {len(progress.processed_items)} items processed"
context += "\n"
if progress.progress_notes:
context += f" Notes: {progress.progress_notes}\n"
# Show next items to process
if progress.items_to_process and progress.status != "completed":
next_items = progress.items_to_process[:3] # Show next 3
context += f" Next items: {', '.join(next_items)}"
if len(progress.items_to_process) > 3:
context += f" (and {len(progress.items_to_process) - 3} more)"
context += "\n"
if progress.remaining_work and progress.status != "completed":
context += f" Still needed: {progress.remaining_work}\n"
# Work queue status
if self.work_queue:
context += f"\n📝 WORK QUEUE: {len(self.work_queue)} items pending\n"
next_work = self.work_queue[0]
context += f" Next: {next_work.get('description', 'Process next item')}\n"
# Next steps
if self.next_steps:
context += f"\n📍 IMMEDIATE NEXT STEPS:\n"
for i, step in enumerate(self.next_steps, 1):
context += f"{i}. {step}\n"
# Available data
if self.scratchpad:
context += f"\n💾 AVAILABLE DATA IN SCRATCHPAD:\n"
for key, value in self.scratchpad.items():
if isinstance(value, list):
context += f"'{key}' - {len(value)} items"
if f"{key}_ids" in self.metadata:
context += f" (IDs tracked)"
context += "\n"
elif isinstance(value, dict):
context += f"'{key}' - dictionary data\n"
else:
context += f"'{key}'\n"
# Recent progress logs
if self.progress_logs:
context += f"\n📜 RECENT ACTIVITY:\n"
for log in self.progress_logs[-3:]: # Show last 3 logs
context += f"{log.timestamp.strftime('%H:%M:%S')} - {log.action}"
if log.items_processed:
context += f" ({len(log.items_processed)} items)"
context += "\n"
if self.scratchpad:
context += "- Scratchpad:\n"
for key, value in self.scratchpad.items():
context += f"{key}: {value}\n"
context += "\n" + "="*50
return context
def get_tools_summary(self) -> Dict[str, Any]:
"""Get a summary of tool usage statistics."""
if not self.tool_usage_history:
return {"total_tool_uses": 0, "unique_tools": 0, "tools_by_frequency": {}}
def analyze_scratchpad_for_criterion_progress(self, criterion: str) -> Dict[str, Any]:
"""Analyze scratchpad data to determine specific progress on a criterion."""
analysis = {
"relevant_data": [],
"item_count": 0,
"processed_ids": set(),
"data_completeness": 0,
"specific_gaps": []
}
tool_counts = {}
for usage in self.tool_usage_history:
tool_counts[usage.tool_name] = tool_counts.get(usage.tool_name, 0) + 1
criterion_lower = criterion.lower()
return {
"total_tool_uses": len(self.tool_usage_history),
"unique_tools": len(set(usage.tool_name for usage in self.tool_usage_history)),
"tools_by_frequency": dict(sorted(tool_counts.items(), key=lambda x: x[1], reverse=True))
}
# Look for data that relates to this criterion
for key, value in self.scratchpad.items():
key_lower = key.lower()
# Check if this data is relevant to the criterion
is_relevant = False
for keyword in criterion_lower.split():
if len(keyword) > 3 and keyword in key_lower: # Skip short words
is_relevant = True
break
if is_relevant:
analysis["relevant_data"].append(key)
# Count items and extract IDs
if isinstance(value, list):
analysis["item_count"] += len(value)
# Try to extract IDs from metadata
if f"{key}_ids" in self.metadata:
analysis["processed_ids"].update(self.metadata[f"{key}_ids"])
elif isinstance(value, dict):
analysis["item_count"] += 1
# Calculate completeness based on what we know
if analysis["item_count"] > 0:
# Check if criterion mentions specific numbers
import re
number_match = re.search(r'\b(\d+)\b', criterion)
if number_match:
expected_count = int(number_match.group(1))
analysis["data_completeness"] = min(100, int((analysis["item_count"] / expected_count) * 100))
if analysis["item_count"] < expected_count:
analysis["specific_gaps"].append(f"Need {expected_count - analysis['item_count']} more items")
else:
# For criteria without specific numbers, use heuristics
if "all" in criterion_lower or "every" in criterion_lower:
# For "all" criteria, we need to be more careful
analysis["data_completeness"] = 50 if analysis["item_count"] > 0 else 0
analysis["specific_gaps"].append("Verify all items are included")
else:
analysis["data_completeness"] = min(100, analysis["item_count"] * 20) # Rough estimate
return analysis
def generate_specific_next_steps(self, criterion: str) -> List[str]:
"""Generate specific, actionable next steps for a criterion."""
analysis = self.analyze_scratchpad_for_criterion_progress(criterion)
progress = self.criteria_progress.get(criterion)
next_steps = []
if not progress:
return ["Initialize progress tracking for this criterion"]
# If we have a queue of items to process
if progress.items_to_process:
next_item = progress.items_to_process[0]
next_steps.append(f"Query/process item: {next_item}")
if len(progress.items_to_process) > 1:
next_steps.append(f"Then process {len(progress.items_to_process) - 1} remaining items")
# If we have processed some items but not all
elif progress.processed_items and progress.total_items_expected:
remaining = progress.total_items_expected - len(progress.processed_items)
if remaining > 0:
next_steps.append(f"Process {remaining} more items to reach target of {progress.total_items_expected}")
# If we have data but haven't accessed it
elif analysis["relevant_data"] and not progress.processed_items:
for data_key in analysis["relevant_data"][:2]: # First 2 relevant keys
next_steps.append(f"Access and process data from '{data_key}'")
# Generic steps based on criterion keywords
else:
criterion_lower = criterion.lower()
if "email" in criterion_lower:
next_steps.append("Use email search/fetch tool to gather emails")
elif "analyze" in criterion_lower or "summary" in criterion_lower:
next_steps.append("Access stored data and create analysis/summary")
else:
next_steps.append(f"Use appropriate tools to gather data for: {criterion}")
return next_steps
def reset(self) -> None:
"""Reset state for a new task."""
self.plan = []
self.acceptance_criteria = []
self.criteria_progress = {}
self.scratchpad = {}
self.current_focus = ""
self.next_steps = []
self.overall_progress = 0
self.progress_logs = []
self.work_queue = []
self.metadata = {}

File diff suppressed because it is too large Load Diff

View File

@@ -74,7 +74,7 @@ class FilteredStream(io.TextIOBase):
"give feedback / get help" in lower_s
or "litellm.info:" in lower_s
or "litellm" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
or "consider using a smaller input or implementing a text splitting strategy" in lower_s
):
return 0

View File

@@ -1,6 +1,6 @@
"""Tool for accessing data stored in the agent's scratchpad during reasoning."""
from typing import Any, Dict, Optional, Type, Union
from typing import Any, Dict, Optional, Type, Union, Callable
from pydantic import BaseModel, Field
from crewai.tools import BaseTool
@@ -29,6 +29,10 @@ class ScratchpadTool(BaseTool):
args_schema: Type[BaseModel] = ScratchpadToolSchema
scratchpad_data: Dict[str, Any] = Field(default_factory=dict)
# Allow repeated usage of this tool - scratchpad access should not be limited
cache_function: Callable = lambda _args, _result: False # Don't cache scratchpad access
allow_repeated_usage: bool = True # Allow accessing the same key multiple times
def __init__(self, scratchpad_data: Optional[Dict[str, Any]] = None, **kwargs):
"""Initialize the scratchpad tool with optional initial data.
@@ -53,25 +57,46 @@ class ScratchpadTool(BaseTool):
Returns:
The value associated with the key, or an error message if not found
"""
print(f"[DEBUG] ScratchpadTool._run called with key: '{key}'")
print(f"[DEBUG] Current scratchpad keys: {list(self.scratchpad_data.keys())}")
print(f"[DEBUG] Scratchpad data size: {len(self.scratchpad_data)}")
if not self.scratchpad_data:
return (
"❌ SCRATCHPAD IS EMPTY\n\n"
"The scratchpad does not contain any data yet.\n"
"Data will be automatically stored here as you use other tools.\n"
"Try executing other tools first to gather information."
"Try executing other tools first to gather information.\n\n"
"💡 TIP: Tools like search, read, or fetch operations will automatically store their results in the scratchpad."
)
if key not in self.scratchpad_data:
available_keys = list(self.scratchpad_data.keys())
keys_formatted = "\n".join(f" - '{k}'" for k in available_keys)
# Create more helpful examples based on actual keys
example_key = available_keys[0] if available_keys else 'example_key'
# Check if the user tried a similar key (case-insensitive or partial match)
similar_keys = [k for k in available_keys if key.lower() in k.lower() or k.lower() in key.lower()]
similarity_hint = ""
if similar_keys:
similarity_hint = f"\n\n🔍 Did you mean one of these?\n" + "\n".join(f" - '{k}'" for k in similar_keys)
return (
f"❌ KEY NOT FOUND: '{key}'\n\n"
f"❌ KEY NOT FOUND: '{key}'\n"
f"{'='*50}\n\n"
f"The key '{key}' does not exist in the scratchpad.\n\n"
f"Available keys:\n{keys_formatted}\n\n"
f"To retrieve data, use the EXACT key name from the list above.\n"
f"Example Action Input: {{\"key\": \"{available_keys[0] if available_keys else 'example_key'}\"}}\n\n"
f"Remember: Keys are case-sensitive and must match exactly!"
f"📦 AVAILABLE KEYS IN SCRATCHPAD:\n{keys_formatted}\n"
f"{similarity_hint}\n\n"
f"✅ CORRECT USAGE EXAMPLE:\n"
f"Action: Access Scratchpad Memory\n"
f"Action Input: {{\"key\": \"{example_key}\"}}\n\n"
f"⚠️ IMPORTANT:\n"
f"- Keys are case-sensitive and must match EXACTLY\n"
f"- Use the exact key name from the list above\n"
f"- Do NOT modify or guess key names\n\n"
f"{'='*50}"
)
value = self.scratchpad_data[key]
@@ -79,12 +104,16 @@ class ScratchpadTool(BaseTool):
# Format the output nicely based on the type
if isinstance(value, dict):
import json
return json.dumps(value, indent=2)
formatted_output = f"✅ Successfully retrieved data for key '{key}':\n\n"
formatted_output += json.dumps(value, indent=2)
return formatted_output
elif isinstance(value, list):
import json
return json.dumps(value, indent=2)
formatted_output = f"✅ Successfully retrieved data for key '{key}':\n\n"
formatted_output += json.dumps(value, indent=2)
return formatted_output
else:
return str(value)
return f"✅ Successfully retrieved data for key '{key}':\n\n{str(value)}"
def update_scratchpad(self, new_data: Dict[str, Any]) -> None:
"""Update the scratchpad data and refresh the tool description.

View File

@@ -39,6 +39,8 @@ class BaseTool(BaseModel, ABC):
"""Maximum number of times this tool can be used. None means unlimited usage."""
current_usage_count: int = 0
"""Current number of times this tool has been used."""
allow_repeated_usage: bool = False
"""Flag to allow this tool to be used repeatedly with the same arguments."""
@field_validator("args_schema", mode="before")
@classmethod
@@ -57,7 +59,7 @@ class BaseTool(BaseModel, ABC):
},
},
)
@field_validator("max_usage_count", mode="before")
@classmethod
def validate_max_usage_count(cls, v: int | None) -> int | None:
@@ -81,11 +83,11 @@ class BaseTool(BaseModel, ABC):
# If _run is async, we safely run it
if asyncio.iscoroutine(result):
result = asyncio.run(result)
self.current_usage_count += 1
return result
def reset_usage_count(self) -> None:
"""Reset the current usage count to zero."""
self.current_usage_count = 0
@@ -109,6 +111,8 @@ class BaseTool(BaseModel, ABC):
result_as_answer=self.result_as_answer,
max_usage_count=self.max_usage_count,
current_usage_count=self.current_usage_count,
allow_repeated_usage=self.allow_repeated_usage,
cache_function=self.cache_function,
)
@classmethod
@@ -272,7 +276,7 @@ def to_langchain(
def tool(*args, result_as_answer: bool = False, max_usage_count: int | None = None) -> Callable:
"""
Decorator to create a tool from a function.
Args:
*args: Positional arguments, either the function to decorate or the tool name.
result_as_answer: Flag to indicate if the tool result should be used as the final agent answer.

View File

@@ -25,6 +25,8 @@ class CrewStructuredTool:
result_as_answer: bool = False,
max_usage_count: int | None = None,
current_usage_count: int = 0,
allow_repeated_usage: bool = False,
cache_function: Optional[Callable] = None,
) -> None:
"""Initialize the structured tool.
@@ -36,6 +38,8 @@ class CrewStructuredTool:
result_as_answer: Whether to return the output directly
max_usage_count: Maximum number of times this tool can be used. None means unlimited usage.
current_usage_count: Current number of times this tool has been used.
allow_repeated_usage: Whether to allow this tool to be used repeatedly with the same arguments.
cache_function: Function that will be used to determine if the tool should be cached.
"""
self.name = name
self.description = description
@@ -45,6 +49,8 @@ class CrewStructuredTool:
self.result_as_answer = result_as_answer
self.max_usage_count = max_usage_count
self.current_usage_count = current_usage_count
self.allow_repeated_usage = allow_repeated_usage
self.cache_function = cache_function if cache_function is not None else lambda _args=None, _result=None: True
# Validate the function signature matches the schema
self._validate_function_signature()

View File

@@ -149,7 +149,13 @@ class ToolUsage:
tool: CrewStructuredTool,
calling: Union[ToolCalling, InstructorToolCalling],
) -> str:
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
# Check if tool allows repeated usage before blocking
allows_repeated = False
if hasattr(tool, 'allow_repeated_usage'):
allows_repeated = tool.allow_repeated_usage
elif hasattr(tool, '_tool') and hasattr(tool._tool, 'allow_repeated_usage'):
allows_repeated = tool._tool.allow_repeated_usage
if not allows_repeated and self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try:
result = self._i18n.errors("task_repeated_usage").format(
tool_names=self.tools_names
@@ -369,6 +375,11 @@ class ToolUsage:
def _format_result(self, result: Any) -> str:
if self.task:
self.task.used_tools += 1
# Handle None results explicitly
if result is None:
result = "No result returned from tool"
if self._should_remember_format():
result = self._remember_format(result=result)
return str(result)
@@ -391,9 +402,19 @@ class ToolUsage:
if not self.tools_handler:
return False
if last_tool_usage := self.tools_handler.last_used_tool:
return (calling.tool_name == last_tool_usage.tool_name) and (
# Add debug logging
print(f"[DEBUG] _check_tool_repeated_usage:")
print(f" Current tool: {calling.tool_name}")
print(f" Current args: {calling.arguments}")
print(f" Last tool: {last_tool_usage.tool_name}")
print(f" Last args: {last_tool_usage.arguments}")
is_repeated = (calling.tool_name == last_tool_usage.tool_name) and (
calling.arguments == last_tool_usage.arguments
)
print(f" Is repeated: {is_repeated}")
return is_repeated
return False
def _check_usage_limit(self, tool: Any, tool_name: str) -> str | None:

View File

@@ -23,6 +23,8 @@ class TestScratchpadTool:
assert "❌ SCRATCHPAD IS EMPTY" in result
assert "does not contain any data yet" in result
assert "Try executing other tools first" in result
assert "💡 TIP:" in result
assert "search, read, or fetch operations" in result
def test_key_not_found_error_message(self):
"""Test error message when key is not found."""
@@ -34,11 +36,14 @@ class TestScratchpadTool:
result = tool._run(key="wrong_key")
assert "❌ KEY NOT FOUND: 'wrong_key'" in result
assert "Available keys:" in result
assert "📦 AVAILABLE KEYS IN SCRATCHPAD:" in result
assert "- 'existing_key'" in result
assert "- 'another_key'" in result
assert 'Example Action Input: {"key": "existing_key"}' in result
assert "Keys are case-sensitive" in result
assert '✅ CORRECT USAGE EXAMPLE:' in result
assert 'Action: Access Scratchpad Memory' in result
assert 'Action Input: {"key": "existing_key"}' in result
assert "⚠️ IMPORTANT:" in result
assert "Keys are case-sensitive and must match EXACTLY" in result
def test_successful_retrieval_string(self):
"""Test successful retrieval of string data."""
@@ -47,7 +52,8 @@ class TestScratchpadTool:
})
result = tool._run(key="message")
assert result == "Hello, World!"
assert "✅ Successfully retrieved data for key 'message':" in result
assert "Hello, World!" in result
def test_successful_retrieval_dict(self):
"""Test successful retrieval of dictionary data."""
@@ -57,6 +63,7 @@ class TestScratchpadTool:
})
result = tool._run(key="user_data")
assert "✅ Successfully retrieved data for key 'user_data':" in result
assert '"name": "John"' in result
assert '"age": 30' in result
@@ -68,6 +75,7 @@ class TestScratchpadTool:
})
result = tool._run(key="items")
assert "✅ Successfully retrieved data for key 'items':" in result
assert '"item1"' in result
assert '"item2"' in result
assert '"item3"' in result
@@ -134,4 +142,35 @@ class TestScratchpadTool:
assert "📌 'nested_dict': list of 3 items" in desc
assert "📌 'empty_list': list of 0 items" in desc
assert "📌 'boolean_value': bool" in desc
assert "📌 'number': int" in desc
assert "📌 'number': int" in desc
def test_similar_key_suggestion(self):
"""Test that similar keys are suggested when a wrong key is used."""
tool = ScratchpadTool(scratchpad_data={
"email_search_results": ["email1", "email2"],
"email_details": {"id": "123"},
"user_preferences": {"theme": "dark"}
})
# Test partial match
result = tool._run(key="email")
assert "🔍 Did you mean one of these?" in result
# Check that similar keys are in the suggestions
# Extract just the "Did you mean" section
did_you_mean_section = result.split("🔍 Did you mean one of these?")[1].split("✅ CORRECT USAGE EXAMPLE:")[0]
assert "- 'email_search_results'" in did_you_mean_section
assert "- 'email_details'" in did_you_mean_section
assert "- 'user_preferences'" not in did_you_mean_section
# But user_preferences should still be in the full list
assert "- 'user_preferences'" in result
# Test case-insensitive match
result = tool._run(key="EMAIL_DETAILS")
assert "🔍 Did you mean one of these?" in result
assert "- 'email_details'" in result
# Test no similar keys
result = tool._run(key="completely_different")
assert "🔍 Did you mean one of these?" not in result