From e4e9bf343a452714386c209420bb6deb8b439475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Moura?= Date: Mon, 2 Jun 2025 10:18:50 -0700 Subject: [PATCH] revamp --- src/crewai/agents/agent_state.py | 464 +++++-- src/crewai/agents/crew_agent_executor.py | 1206 +++++++---------- src/crewai/llm.py | 2 +- .../tools/agent_tools/scratchpad_tool.py | 49 +- src/crewai/tools/base_tool.py | 14 +- src/crewai/tools/structured_tool.py | 6 + src/crewai/tools/tool_usage.py | 25 +- .../tools/agent_tools/test_scratchpad_tool.py | 49 +- 8 files changed, 979 insertions(+), 836 deletions(-) diff --git a/src/crewai/agents/agent_state.py b/src/crewai/agents/agent_state.py index b75d81e69..b6f40e4b6 100644 --- a/src/crewai/agents/agent_state.py +++ b/src/crewai/agents/agent_state.py @@ -1,176 +1,386 @@ -"""Agent state management for long-running tasks.""" +"""Agent state management for long-running tasks with focus on progress tracking.""" -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union, Set from pydantic import BaseModel, Field from datetime import datetime +import json -class ToolUsage(BaseModel): - """Record of a single tool usage.""" - tool_name: str = Field(description="Name of the tool used") - arguments: Dict[str, Any] = Field(description="Arguments passed to the tool (may be truncated)") - result_summary: Optional[str] = Field(default=None, description="Brief summary of the tool's result") - timestamp: datetime = Field(default_factory=datetime.now, description="When the tool was used") - step_number: int = Field(description="Which execution step this tool was used in") +class CriterionProgress(BaseModel): + """Progress tracking for a single acceptance criterion.""" + criterion: str = Field(description="The acceptance criterion") + status: str = Field(default="not_started", description="Status: not_started, in_progress, completed") + progress_notes: str = Field(default="", description="Specific progress made towards this criterion") + completion_percentage: int = Field(default=0, description="Estimated completion percentage (0-100)") + remaining_work: str = Field(default="", description="What still needs to be done for this criterion") + + # Enhanced tracking + processed_items: Set[str] = Field(default_factory=set, description="IDs or identifiers of processed items") + total_items_expected: Optional[int] = Field(default=None, description="Total number of items expected (if known)") + items_to_process: List[str] = Field(default_factory=list, description="Queue of specific items to process next") + last_updated: datetime = Field(default_factory=datetime.now) + + +class ProgressLog(BaseModel): + """Single log entry for progress tracking.""" + timestamp: datetime = Field(default_factory=datetime.now) + action: str = Field(description="What action was taken") + result: str = Field(description="Result or outcome of the action") + items_processed: List[str] = Field(default_factory=list, description="Items processed in this action") + criterion: Optional[str] = Field(default=None, description="Related acceptance criterion") class AgentState(BaseModel): - """Persistent state object for agent task execution. + """Enhanced state management with deterministic progress tracking. - This state object helps agents maintain coherence during long-running tasks - by tracking plans, progress, and intermediate results without relying solely - on conversation history. + This state helps agents maintain focus during long executions by tracking + specific progress against each acceptance criterion with detailed logging. """ - # Core fields - original_plan: List[str] = Field( + # Core planning elements + plan: List[str] = Field( default_factory=list, - description="The initial plan from first reasoning pass. Never overwrite unless user requests complete replan" + description="The current plan steps" ) acceptance_criteria: List[str] = Field( default_factory=list, - description="Concrete goals to satisfy for task completion" + description="Concrete criteria that must be met for task completion" ) + # Progress tracking + criteria_progress: Dict[str, CriterionProgress] = Field( + default_factory=dict, + description="Detailed progress for each acceptance criterion" + ) + + # Data storage scratchpad: Dict[str, Any] = Field( default_factory=dict, - description="Agent-defined storage for intermediate results and metadata" + description="Storage for intermediate results and data" ) - tool_usage_history: List[ToolUsage] = Field( + # Simple tracking + current_focus: str = Field( + default="", + description="What the agent should be focusing on right now" + ) + + next_steps: List[str] = Field( default_factory=list, - description="Detailed history of tool usage including arguments and results" + description="Immediate next steps to take" ) - # Additional tracking fields - task_id: Optional[str] = Field( - default=None, - description="ID of the current task being executed" - ) - - created_at: datetime = Field( - default_factory=datetime.now, - description="When this state was created" - ) - - last_updated: datetime = Field( - default_factory=datetime.now, - description="When this state was last modified" - ) - - steps_completed: int = Field( + overall_progress: int = Field( default=0, - description="Number of execution steps completed" + description="Overall task completion percentage (0-100)" ) - def set_original_plan(self, plan: List[str]) -> None: - """Set the original plan (only if not already set).""" - if not self.original_plan: - self.original_plan = plan - self.last_updated = datetime.now() + # Enhanced tracking + progress_logs: List[ProgressLog] = Field( + default_factory=list, + description="Detailed log of all progress made" + ) + + work_queue: List[Dict[str, Any]] = Field( + default_factory=list, + description="Queue of specific work items to process" + ) + + # Metadata tracking + metadata: Dict[str, Any] = Field( + default_factory=dict, + description="Additional metadata for tracking (e.g., total count expectations)" + ) + + def initialize_criteria_progress(self) -> None: + """Initialize progress tracking for all acceptance criteria.""" + for criterion in self.acceptance_criteria: + if criterion not in self.criteria_progress: + self.criteria_progress[criterion] = CriterionProgress(criterion=criterion) + + def update_criterion_progress( + self, + criterion: str, + status: str, + progress_notes: str, + completion_percentage: int, + remaining_work: str, + processed_items: Optional[List[str]] = None, + items_to_process: Optional[List[str]] = None, + total_items_expected: Optional[int] = None + ) -> None: + """Update progress for a specific criterion with enhanced tracking.""" + if criterion in self.criteria_progress: + progress = self.criteria_progress[criterion] + progress.status = status + progress.progress_notes = progress_notes + progress.completion_percentage = max(0, min(100, completion_percentage)) + progress.remaining_work = remaining_work + progress.last_updated = datetime.now() + + # Update processed items + if processed_items: + progress.processed_items.update(processed_items) + + # Update items to process queue + if items_to_process is not None: + progress.items_to_process = items_to_process + + # Update total expected if provided + if total_items_expected is not None: + progress.total_items_expected = total_items_expected + + # Recalculate completion percentage based on actual items if possible + if progress.total_items_expected and progress.total_items_expected > 0: + actual_percentage = int((len(progress.processed_items) / progress.total_items_expected) * 100) + progress.completion_percentage = actual_percentage + + # Update overall progress + self._recalculate_overall_progress() + + def _recalculate_overall_progress(self) -> None: + """Recalculate overall progress based on all criteria.""" + if not self.criteria_progress: + self.overall_progress = 0 + return + + total_progress = sum(p.completion_percentage for p in self.criteria_progress.values()) + self.overall_progress = int(total_progress / len(self.criteria_progress)) def add_to_scratchpad(self, key: str, value: Any) -> None: """Add or update a value in the scratchpad.""" self.scratchpad[key] = value - self.last_updated = datetime.now() - def record_tool_usage( - self, - tool_name: str, - arguments: Dict[str, Any], - result_summary: Optional[str] = None, - max_arg_length: int = 200 - ) -> None: - """Record a tool usage with truncated arguments. + # Analyze the data for item tracking + self._analyze_scratchpad_for_items(key, value) - Args: - tool_name: Name of the tool used - arguments: Arguments passed to the tool - result_summary: Optional brief summary of the result - max_arg_length: Maximum length for string arguments before truncation - """ - # Truncate long string arguments to prevent state bloat - truncated_args = {} - for key, value in arguments.items(): - if isinstance(value, str) and len(value) > max_arg_length: - truncated_args[key] = value[:max_arg_length] + "..." - elif isinstance(value, (list, dict)): - # For complex types, store a summary - truncated_args[key] = f"<{type(value).__name__} with {len(value)} items>" - else: - truncated_args[key] = value + def _analyze_scratchpad_for_items(self, key: str, value: Any) -> None: + """Analyze scratchpad data to extract trackable items.""" + # If it's a list, try to extract IDs + if isinstance(value, list) and value: + item_ids = [] + for item in value: + if isinstance(item, dict): + # Look for common ID fields + for id_field in ['id', 'ID', 'uid', 'uuid', 'message_id', 'email_id']: + if id_field in item: + item_ids.append(str(item[id_field])) + break - tool_usage = ToolUsage( - tool_name=tool_name, - arguments=truncated_args, - result_summary=result_summary, - step_number=self.steps_completed + if item_ids: + # Store metadata about this list + self.metadata[f"{key}_ids"] = item_ids + self.metadata[f"{key}_count"] = len(value) + + def log_progress(self, action: str, result: str, items_processed: Optional[List[str]] = None, criterion: Optional[str] = None) -> None: + """Add a progress log entry.""" + log_entry = ProgressLog( + action=action, + result=result, + items_processed=items_processed or [], + criterion=criterion ) + self.progress_logs.append(log_entry) - self.tool_usage_history.append(tool_usage) - self.last_updated = datetime.now() + def add_to_work_queue(self, work_item: Dict[str, Any]) -> None: + """Add an item to the work queue.""" + self.work_queue.append(work_item) - def increment_steps(self) -> None: - """Increment the step counter.""" - self.steps_completed += 1 - self.last_updated = datetime.now() + def get_next_work_item(self) -> Optional[Dict[str, Any]]: + """Get and remove the next item from the work queue.""" + if self.work_queue: + return self.work_queue.pop(0) + return None - def reset(self, task_id: Optional[str] = None) -> None: - """Reset state for a new task.""" - self.original_plan = [] - self.acceptance_criteria = [] - self.scratchpad = {} - self.tool_usage_history = [] - self.task_id = task_id - self.created_at = datetime.now() - self.last_updated = datetime.now() - self.steps_completed = 0 + def set_focus_and_next_steps(self, focus: str, next_steps: List[str]) -> None: + """Update current focus and next steps.""" + self.current_focus = focus + self.next_steps = next_steps - def to_context_string(self) -> str: - """Generate a concise string representation for LLM context.""" - context = f"Current State (Step {self.steps_completed}):\n" - context += f"- Task ID: {self.task_id}\n" + def get_progress_context(self) -> str: + """Generate a focused progress update for the agent.""" + context = f"πŸ“Š PROGRESS UPDATE (Overall: {self.overall_progress}%)\n" + context += "="*50 + "\n\n" - if self.acceptance_criteria: - context += "- Acceptance Criteria:\n" - for criterion in self.acceptance_criteria: - context += f" β€’ {criterion}\n" + # Current focus + if self.current_focus: + context += f"🎯 CURRENT FOCUS: {self.current_focus}\n\n" - if self.original_plan: - context += "- Plan:\n" - for i, step in enumerate(self.original_plan, 1): - context += f" {i}. {step}\n" + # Progress on each criterion with detailed tracking + if self.criteria_progress: + context += "πŸ“‹ ACCEPTANCE CRITERIA PROGRESS:\n" + for criterion, progress in self.criteria_progress.items(): + status_emoji = "βœ…" if progress.status == "completed" else "πŸ”„" if progress.status == "in_progress" else "⏸️" + context += f"\n{status_emoji} {criterion}\n" - if self.tool_usage_history: - context += "- Recent Tool Usage:\n" - # Show last 5 tool uses - recent_tools = self.tool_usage_history[-5:] - for usage in recent_tools: - context += f" β€’ Step {usage.step_number}: {usage.tool_name}" - if usage.arguments: - args_preview = ", ".join(f"{k}={v}" for k, v in list(usage.arguments.items())[:2]) - context += f"({args_preview})" + # Show detailed progress + if progress.total_items_expected: + context += f" Progress: {len(progress.processed_items)}/{progress.total_items_expected} items ({progress.completion_percentage}%)\n" + else: + context += f" Progress: {progress.completion_percentage}%" + if progress.processed_items: + context += f" - {len(progress.processed_items)} items processed" + context += "\n" + + if progress.progress_notes: + context += f" Notes: {progress.progress_notes}\n" + + # Show next items to process + if progress.items_to_process and progress.status != "completed": + next_items = progress.items_to_process[:3] # Show next 3 + context += f" Next items: {', '.join(next_items)}" + if len(progress.items_to_process) > 3: + context += f" (and {len(progress.items_to_process) - 3} more)" + context += "\n" + + if progress.remaining_work and progress.status != "completed": + context += f" Still needed: {progress.remaining_work}\n" + + # Work queue status + if self.work_queue: + context += f"\nπŸ“ WORK QUEUE: {len(self.work_queue)} items pending\n" + next_work = self.work_queue[0] + context += f" Next: {next_work.get('description', 'Process next item')}\n" + + # Next steps + if self.next_steps: + context += f"\nπŸ“ IMMEDIATE NEXT STEPS:\n" + for i, step in enumerate(self.next_steps, 1): + context += f"{i}. {step}\n" + + # Available data + if self.scratchpad: + context += f"\nπŸ’Ύ AVAILABLE DATA IN SCRATCHPAD:\n" + for key, value in self.scratchpad.items(): + if isinstance(value, list): + context += f" β€’ '{key}' - {len(value)} items" + if f"{key}_ids" in self.metadata: + context += f" (IDs tracked)" + context += "\n" + elif isinstance(value, dict): + context += f" β€’ '{key}' - dictionary data\n" + else: + context += f" β€’ '{key}'\n" + + # Recent progress logs + if self.progress_logs: + context += f"\nπŸ“œ RECENT ACTIVITY:\n" + for log in self.progress_logs[-3:]: # Show last 3 logs + context += f" β€’ {log.timestamp.strftime('%H:%M:%S')} - {log.action}" + if log.items_processed: + context += f" ({len(log.items_processed)} items)" context += "\n" - if self.scratchpad: - context += "- Scratchpad:\n" - for key, value in self.scratchpad.items(): - context += f" β€’ {key}: {value}\n" - + context += "\n" + "="*50 return context - def get_tools_summary(self) -> Dict[str, Any]: - """Get a summary of tool usage statistics.""" - if not self.tool_usage_history: - return {"total_tool_uses": 0, "unique_tools": 0, "tools_by_frequency": {}} + def analyze_scratchpad_for_criterion_progress(self, criterion: str) -> Dict[str, Any]: + """Analyze scratchpad data to determine specific progress on a criterion.""" + analysis = { + "relevant_data": [], + "item_count": 0, + "processed_ids": set(), + "data_completeness": 0, + "specific_gaps": [] + } - tool_counts = {} - for usage in self.tool_usage_history: - tool_counts[usage.tool_name] = tool_counts.get(usage.tool_name, 0) + 1 + criterion_lower = criterion.lower() - return { - "total_tool_uses": len(self.tool_usage_history), - "unique_tools": len(set(usage.tool_name for usage in self.tool_usage_history)), - "tools_by_frequency": dict(sorted(tool_counts.items(), key=lambda x: x[1], reverse=True)) - } \ No newline at end of file + # Look for data that relates to this criterion + for key, value in self.scratchpad.items(): + key_lower = key.lower() + + # Check if this data is relevant to the criterion + is_relevant = False + for keyword in criterion_lower.split(): + if len(keyword) > 3 and keyword in key_lower: # Skip short words + is_relevant = True + break + + if is_relevant: + analysis["relevant_data"].append(key) + + # Count items and extract IDs + if isinstance(value, list): + analysis["item_count"] += len(value) + + # Try to extract IDs from metadata + if f"{key}_ids" in self.metadata: + analysis["processed_ids"].update(self.metadata[f"{key}_ids"]) + + elif isinstance(value, dict): + analysis["item_count"] += 1 + + # Calculate completeness based on what we know + if analysis["item_count"] > 0: + # Check if criterion mentions specific numbers + import re + number_match = re.search(r'\b(\d+)\b', criterion) + if number_match: + expected_count = int(number_match.group(1)) + analysis["data_completeness"] = min(100, int((analysis["item_count"] / expected_count) * 100)) + if analysis["item_count"] < expected_count: + analysis["specific_gaps"].append(f"Need {expected_count - analysis['item_count']} more items") + else: + # For criteria without specific numbers, use heuristics + if "all" in criterion_lower or "every" in criterion_lower: + # For "all" criteria, we need to be more careful + analysis["data_completeness"] = 50 if analysis["item_count"] > 0 else 0 + analysis["specific_gaps"].append("Verify all items are included") + else: + analysis["data_completeness"] = min(100, analysis["item_count"] * 20) # Rough estimate + + return analysis + + def generate_specific_next_steps(self, criterion: str) -> List[str]: + """Generate specific, actionable next steps for a criterion.""" + analysis = self.analyze_scratchpad_for_criterion_progress(criterion) + progress = self.criteria_progress.get(criterion) + next_steps = [] + + if not progress: + return ["Initialize progress tracking for this criterion"] + + # If we have a queue of items to process + if progress.items_to_process: + next_item = progress.items_to_process[0] + next_steps.append(f"Query/process item: {next_item}") + if len(progress.items_to_process) > 1: + next_steps.append(f"Then process {len(progress.items_to_process) - 1} remaining items") + + # If we have processed some items but not all + elif progress.processed_items and progress.total_items_expected: + remaining = progress.total_items_expected - len(progress.processed_items) + if remaining > 0: + next_steps.append(f"Process {remaining} more items to reach target of {progress.total_items_expected}") + + # If we have data but haven't accessed it + elif analysis["relevant_data"] and not progress.processed_items: + for data_key in analysis["relevant_data"][:2]: # First 2 relevant keys + next_steps.append(f"Access and process data from '{data_key}'") + + # Generic steps based on criterion keywords + else: + criterion_lower = criterion.lower() + if "email" in criterion_lower: + next_steps.append("Use email search/fetch tool to gather emails") + elif "analyze" in criterion_lower or "summary" in criterion_lower: + next_steps.append("Access stored data and create analysis/summary") + else: + next_steps.append(f"Use appropriate tools to gather data for: {criterion}") + + return next_steps + + def reset(self) -> None: + """Reset state for a new task.""" + self.plan = [] + self.acceptance_criteria = [] + self.criteria_progress = {} + self.scratchpad = {} + self.current_focus = "" + self.next_steps = [] + self.overall_progress = 0 + self.progress_logs = [] + self.work_queue = [] + self.metadata = {} \ No newline at end of file diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 8c38d6b7b..f146f5601 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -1,5 +1,5 @@ from typing import Any, Callable, Dict, List, Optional, Union, cast, Tuple -import json +import re from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin @@ -89,11 +89,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = { tool.name: tool for tool in self.tools } - self.steps_since_reasoning = 0 - self.agent_state: AgentState = AgentState( - task_id=str(task.id) if task else None - ) + self.agent_state: AgentState = AgentState() self.scratchpad_tool: Optional[ScratchpadTool] = None + self.max_iterations_exceeded = False existing_stop = self.llm.stop or [] self.llm.stop = list( set( @@ -129,7 +127,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: # Reset agent state for new task execution - self.agent_state.reset(task_id=str(self.task.id) if self.task else None) + self.agent_state.reset() + self.max_iterations_exceeded = False # Reset the flag for new execution if "system" in self.prompt: system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs) @@ -184,9 +183,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): # Extract structured plan if available if reasoning_output.plan.structured_plan: - self.agent_state.set_original_plan( - reasoning_output.plan.structured_plan.steps - ) + self.agent_state.plan = reasoning_output.plan.structured_plan.steps self.agent_state.acceptance_criteria = ( reasoning_output.plan.structured_plan.acceptance_criteria ) @@ -209,12 +206,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if len(steps) >= 10: # Limit to 10 steps break if steps: - self.agent_state.set_original_plan(steps) + self.agent_state.plan = steps - # Add state context to messages for coherence - if self.agent_state.original_plan: - state_context = f"Initial plan loaded with {len(self.agent_state.original_plan)} steps." - self._append_message(state_context, role="assistant") + # Initialize progress tracking for criteria + if self.agent_state.acceptance_criteria: + self.agent_state.initialize_criteria_progress() + + # Set initial focus + self.agent_state.set_focus_and_next_steps( + focus=f"Starting task execution to meet {len(self.agent_state.acceptance_criteria)} acceptance criteria", + next_steps=["Begin with the first step of the plan", "Use appropriate tools to gather required data"] + ) # Clear the reasoning output to avoid using it again self.agent._last_reasoning_output = None @@ -237,14 +239,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if has_reached_max_iterations(self.iterations, self.max_iter): print(f"[DEBUG] Max iterations reached") - formatted_answer = handle_max_iterations_exceeded( - formatted_answer, - printer=self._printer, - i18n=self._i18n, - messages=self.messages, - llm=self.llm, - callbacks=self.callbacks, + self.max_iterations_exceeded = True # Set flag + + # Add informative message about skipping validation + if self.agent_state.acceptance_criteria: + self._printer.print( + content="\n⚠️ Max iterations reached - forcing completion without acceptance criteria validation", + color="yellow" + ) + + # Directly create a final answer based on current progress + # Extract any existing data from scratchpad or messages + final_output = self._create_forced_final_answer() + + formatted_answer = AgentFinish( + thought="Maximum iterations reached - compiling available results", + output=final_output, + text=final_output ) + break # Exit the loop immediately with the forced answer enforce_rpm_limit(self.request_within_rpm_limit) @@ -265,7 +278,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): print(f"[DEBUG] Agent trying to finish - checking acceptance criteria") # Validate acceptance criteria if reasoning is enabled and criteria exist if (hasattr(self.agent, "reasoning") and self.agent.reasoning - and self.agent_state.acceptance_criteria): + and self.agent_state.acceptance_criteria + and not self.max_iterations_exceeded): self._printer.print( content="\nValidating acceptance criteria before finalizing...", @@ -341,85 +355,67 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): formatted_answer, tool_result ) - # Record detailed tool usage in agent state - if hasattr(formatted_answer, "tool") and formatted_answer.tool: - # Extract tool arguments from the agent action - tool_args = {} - if ( - hasattr(formatted_answer, "tool_input") - and formatted_answer.tool_input - ): - if isinstance(formatted_answer.tool_input, dict): - tool_args = formatted_answer.tool_input - elif isinstance(formatted_answer.tool_input, str): - # Try to parse JSON if it's a string - try: - import json + # Special handling for scratchpad tool errors + if (formatted_answer.tool == "Access Scratchpad Memory" + and tool_result + and hasattr(tool_result, 'result') + and isinstance(tool_result.result, str) + and "❌ KEY NOT FOUND:" in tool_result.result): + # Extract available keys from the error message + error_lines = tool_result.result.split('\n') + keys_section_start = False + available_keys = [] - tool_args = json.loads(formatted_answer.tool_input) - except (json.JSONDecodeError, TypeError): - tool_args = {"input": formatted_answer.tool_input} + for line in error_lines: + if "AVAILABLE KEYS IN SCRATCHPAD:" in line: + keys_section_start = True + continue + if keys_section_start and line.strip().startswith("- '"): + key_match = re.search(r"- '([^']+)'", line) + if key_match: + available_keys.append(key_match.group(1)) + elif keys_section_start and not line.strip().startswith("- "): + break - # Truncate result for summary - result_summary = None - if tool_result and hasattr(tool_result, "result"): - result_str = str(tool_result.result) - result_summary = ( - result_str[:200] + "..." - if len(result_str) > 200 - else result_str + if available_keys: + system_msg = ( + f"⚠️ SCRATCHPAD ACCESS ERROR - PAY ATTENTION!\n\n" + f"You tried to access a key that doesn't exist.\n" + f"HERE ARE THE CORRECT KEYS YOU MUST USE:\n" + f"{chr(10).join(f' βœ“ {key}' for key in available_keys)}\n\n" + f"NEXT ACTION: Use 'Access Scratchpad Memory' with one of the keys above.\n" + f"Example: Action Input: {{\"key\": \"{available_keys[0]}\"}}" ) + self._append_message(system_msg, role="system") - # Record the tool usage with arguments - self.agent_state.record_tool_usage( + # Extract to scratchpad if reasoning is enabled and tool was successful + if ( + hasattr(self.agent, "reasoning") + and self.agent.reasoning + and tool_result + and formatted_answer.tool != "Access Scratchpad Memory" + and self._is_tool_execution_successful(tool_result) + ): + print(f"[DEBUG] Starting scratchpad extraction for {formatted_answer.tool}") + self._extract_tool_result_to_scratchpad( tool_name=formatted_answer.tool, - arguments=tool_args, - result_summary=result_summary, + tool_args=getattr(formatted_answer, 'tool_input', {}), + tool_result=tool_result, ) - # Extract relevant information to scratchpad if reasoning is enabled - # Note: This only happens for actual tool executions (AgentAction with tool), - # not for reasoning steps or other agent outputs - if ( - hasattr(self.agent, "reasoning") - and self.agent.reasoning - and tool_result - and formatted_answer.tool != "Access Scratchpad Memory" # Skip scratchpad tool itself - and self._is_tool_execution_successful(tool_result) # Only for successful executions - ): - print(f"[DEBUG] Starting scratchpad extraction for {formatted_answer.tool}") - self._extract_tool_result_to_scratchpad( - tool_name=formatted_answer.tool, - tool_args=tool_args, - tool_result=tool_result, - ) - - # Always print agent state (temporary debug) - print( - f"Agent State:\n" - f"Raw: {self.agent_state.model_dump_json()}\n" - f"[AGENT STATE] Step {self.agent_state.steps_completed}:\n" - f"Original Plan: {getattr(self.agent_state, 'original_plan', None)}\n" - f"Tool: {formatted_answer.tool}\n" - f"Scratchpad: {self.agent_state.scratchpad}\n" - f"Tool History: {len(self.agent_state.tool_usage_history)} entries\n" - f"Full State Context:\n{self.agent_state.to_context_string()}" - ) - - # Increment steps in agent state - self.agent_state.increment_steps() + # After each step, update progress tracking + if hasattr(self.agent, "reasoning") and self.agent.reasoning and self.agent_state.acceptance_criteria: + self._update_progress_tracking() # Update scratchpad tool if it exists if self.scratchpad_tool and self.agent_state.scratchpad: print(f"[DEBUG] Updating scratchpad tool") self._update_scratchpad_tool() - if self._should_trigger_reasoning(): - print(f"[DEBUG] Triggering mid-execution reasoning") - self._handle_mid_execution_reasoning() - print(f"[DEBUG] Mid-execution reasoning completed") - else: - self.steps_since_reasoning += 1 + # Inject progress context for the next iteration + if self.agent_state.acceptance_criteria: + progress_context = self.agent_state.get_progress_context() + self._append_message(progress_context, role="system") self._invoke_step_callback(formatted_answer) self._append_message(formatted_answer.text, role="assistant") @@ -658,269 +654,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): color="red", ) - def _should_trigger_reasoning(self) -> bool: - """ - Determine if mid-execution reasoning should be triggered. - - Returns: - bool: True if reasoning should be triggered, False otherwise. - """ - if self.iterations == 0: - return False - - if not hasattr(self.agent, "reasoning") or not self.agent.reasoning: - return False - - if ( - hasattr(self.agent, "reasoning_interval") - and self.agent.reasoning_interval is not None - ): - return self.steps_since_reasoning >= self.agent.reasoning_interval - - if hasattr(self.agent, "adaptive_reasoning") and self.agent.adaptive_reasoning: - return self._should_adaptive_reason() - - return False - - def _should_adaptive_reason(self) -> bool: - """ - Determine if adaptive reasoning should be triggered using LLM decision. - Fallback to error detection if LLM decision fails. - - Returns: - bool: True if adaptive reasoning should be triggered, False otherwise. - """ - if self._has_recent_errors(): - try: - from crewai.utilities.events.reasoning_events import ( - AgentAdaptiveReasoningDecisionEvent, - ) - from crewai.utilities.events.crewai_event_bus import crewai_event_bus - - crewai_event_bus.emit( - self.agent, - AgentAdaptiveReasoningDecisionEvent( - agent_role=self.agent.role, - task_id=str(self.task.id), - should_reason=True, - reasoning="Recent error indicators detected in previous messages.", - ), - ) - except Exception: - pass - return True - - try: - from crewai.utilities.reasoning_handler import AgentReasoning - from crewai.agent import Agent - - current_progress = self._summarize_current_progress() - - # Build detailed tools used list from agent state - tools_used_detailed = [] - for usage in self.agent_state.tool_usage_history: - tool_desc = f"{usage.tool_name}" - if usage.arguments: - args_preview = ", ".join( - f"{k}={v}" for k, v in list(usage.arguments.items())[:2] - ) - tool_desc += f"({args_preview})" - tools_used_detailed.append(tool_desc) - - # Get tool usage statistics and patterns - tool_stats = self.agent_state.get_tools_summary() - - # Detect patterns in tool usage - tool_patterns = self._detect_tool_patterns() - if tool_patterns: - tool_stats["recent_patterns"] = tool_patterns - - reasoning_handler = AgentReasoning( - task=self.task, agent=cast(Agent, self.agent) - ) - - return reasoning_handler.should_adaptive_reason_llm( - current_steps=self.iterations, - tools_used=tools_used_detailed, - current_progress=current_progress, - tool_usage_stats=tool_stats, - ) - except Exception as e: - self._printer.print( - content=f"Error during adaptive reasoning decision: {str(e)}. Using fallback error detection.", - color="yellow", - ) - return False - - def _detect_tool_patterns(self) -> Optional[str]: - """ - Detect patterns in recent tool usage that might indicate issues. - - Returns: - Optional[str]: Description of detected patterns, or None - """ - if not self.agent_state.tool_usage_history: - return None - - patterns = [] - - # Check for repeated use of the same tool with similar arguments - recent_tools = ( - self.agent_state.tool_usage_history[-5:] - if len(self.agent_state.tool_usage_history) >= 5 - else self.agent_state.tool_usage_history - ) - - # Count consecutive uses of the same tool - if len(recent_tools) >= 2: - consecutive_count = 1 - for i in range(1, len(recent_tools)): - if recent_tools[i].tool_name == recent_tools[i - 1].tool_name: - consecutive_count += 1 - if consecutive_count >= 3: - patterns.append( - f"Same tool ({recent_tools[i].tool_name}) used {consecutive_count} times consecutively" - ) - else: - consecutive_count = 1 - - # Check for tools with empty or error results - error_count = 0 - for usage in recent_tools: - if usage.result_summary and any( - keyword in usage.result_summary.lower() - for keyword in ["error", "failed", "not found", "empty"] - ): - error_count += 1 - - if error_count >= 2: - patterns.append( - f"{error_count} tools returned errors or empty results recently" - ) - - # Check for rapid tool switching (might indicate confusion) - if ( - len(set(usage.tool_name for usage in recent_tools)) == len(recent_tools) - and len(recent_tools) >= 4 - ): - patterns.append( - "Rapid switching between different tools without repetition" - ) - - return "; ".join(patterns) if patterns else None - - def _handle_mid_execution_reasoning(self) -> None: - """ - Handle mid-execution reasoning by calling the reasoning handler. - """ - if not hasattr(self.agent, "reasoning") or not self.agent.reasoning: - return - - try: - from crewai.utilities.reasoning_handler import AgentReasoning - - current_progress = self._summarize_current_progress() - - # Include agent state in progress summary - state_info = f"\n\n{self.agent_state.to_context_string()}" - current_progress += state_info - - from crewai.agent import Agent - - reasoning_handler = AgentReasoning( - task=self.task, agent=cast(Agent, self.agent) - ) - - # Build detailed tools used list from agent state - tools_used_detailed = [] - for usage in self.agent_state.tool_usage_history: - tool_desc = f"{usage.tool_name}" - if usage.arguments: - args_preview = ", ".join( - f"{k}={v}" for k, v in list(usage.arguments.items())[:2] - ) - tool_desc += f"({args_preview})" - tools_used_detailed.append(tool_desc) - - reasoning_output = reasoning_handler.handle_mid_execution_reasoning( - current_steps=self.iterations, - tools_used=tools_used_detailed, - current_progress=current_progress, - iteration_messages=self.messages, - ) - - # Update acceptance criteria if they changed from the reasoning output - if reasoning_output.plan.structured_plan: - if reasoning_output.plan.structured_plan.acceptance_criteria: - self.agent_state.acceptance_criteria = ( - reasoning_output.plan.structured_plan.acceptance_criteria - ) - - # Don't add reasoning metadata to scratchpad - keep it exclusively for tool results - - updated_plan_msg = ( - self._i18n.retrieve( - "reasoning", "mid_execution_reasoning_update" - ).format(plan=reasoning_output.plan.plan) - + f"\n\nUpdated State:\n{self.agent_state.to_context_string()}" - + "\n\nRemember: strictly follow the updated plan above and ensure the final answer fully meets the EXPECTED OUTPUT criteria." - ) - - self._append_message(updated_plan_msg, role="assistant") - - self.steps_since_reasoning = 0 - - except Exception as e: - self._printer.print( - content=f"Error during mid-execution reasoning: {str(e)}", - color="red", - ) - - def _summarize_current_progress(self) -> str: - """ - Create a summary of the current execution progress. - - Returns: - str: A summary of the current progress. - """ - recent_messages = ( - self.messages[-5:] if len(self.messages) >= 5 else self.messages - ) - - summary = f"After {self.iterations} steps, " - - # Use tool usage history from agent state for better context - if self.agent_state.tool_usage_history: - tool_summary = self.agent_state.get_tools_summary() - summary += f"I've used {tool_summary['total_tool_uses']} tools ({tool_summary['unique_tools']} unique). " - - # Include most frequently used tools - if tool_summary["tools_by_frequency"]: - top_tools = list(tool_summary["tools_by_frequency"].items())[:3] - tools_str = ", ".join(f"{tool} ({count}x)" for tool, count in top_tools) - summary += f"Most used: {tools_str}. " - - # Include details of the last tool use - if self.agent_state.tool_usage_history: - last_tool = self.agent_state.tool_usage_history[-1] - summary += f"Last tool: {last_tool.tool_name}" - if last_tool.arguments: - args_str = ", ".join( - f"{k}={v}" for k, v in list(last_tool.arguments.items())[:2] - ) - summary += f" with args ({args_str})" - summary += ". " - else: - summary += "I haven't used any tools yet. " - - if recent_messages: - last_message = recent_messages[-1].get("content", "") - if len(last_message) > 100: - last_message = last_message[:100] + "..." - summary += f"Most recent action: {last_message}" - - return summary - def _has_recent_errors(self) -> bool: """Check for error indicators in recent messages.""" error_indicators = ["error", "exception", "failed", "unable to", "couldn't"] @@ -937,256 +670,81 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def _extract_tool_result_to_scratchpad( self, tool_name: str, tool_args: Dict[str, Any], tool_result: ToolResult ) -> None: - """Extract relevant information from tool result using LLM and add to scratchpad. - - This method uses the agent's LLM to intelligently extract and summarize - important information from tool results, storing it in the agent's scratchpad - for future reference during task execution. - - Args: - tool_name: Name of the tool that was executed - tool_args: Arguments that were passed to the tool - tool_result: The result returned by the tool - """ + """Extract relevant information from tool result and add to scratchpad with progress logging.""" print(f"[DEBUG] _extract_tool_result_to_scratchpad started for tool: {tool_name}") try: - # Check result size and potentially skip LLM extraction for very large results - result_str = str(tool_result.result) - result_size = len(result_str) - print(f"[DEBUG] Tool result size: {result_size} characters") + # Generate a meaningful key from tool name + scratchpad_key = tool_name.lower().replace(" ", "_").replace("_tool", "") - # For very large results (>100KB), skip LLM extraction and store directly - if result_size > 100000: - print(f"[DEBUG] Result too large ({result_size} chars), storing directly without LLM extraction") - scratchpad_key = tool_name.replace("_", "") + # Store the result data + if hasattr(tool_result, 'result'): + result_data = tool_result.result - # Try to parse as JSON if possible - try: - if isinstance(tool_result.result, str): - result_data = json.loads(tool_result.result) - else: - result_data = tool_result.result - except: - result_data = tool_result.result + # Try to parse JSON if it's a string + if isinstance(result_data, str): + try: + import json + parsed_data = json.loads(result_data) + result_data = parsed_data + except: + pass # Keep as string if not parseable - self.agent_state.add_to_scratchpad( - scratchpad_key, - { - "data": result_data, - "tool": tool_name, - "tool_args": tool_args, - "large_result": True, - "size": result_size - } - ) - print(f"[DEBUG] Large result stored directly to scratchpad") - return + # Store in scratchpad + self.agent_state.add_to_scratchpad(scratchpad_key, result_data) + print(f"[DEBUG] Stored result to scratchpad with key: {scratchpad_key}") - # Create a prompt for the LLM to extract relevant information - result_preview = str(tool_result.result)[:200] + "..." if len(str(tool_result.result)) > 200 else str(tool_result.result) - print(f"[DEBUG] Tool result preview: {result_preview}") + # Extract item count and IDs for logging + items_processed = [] + if isinstance(result_data, list): + # Try to extract IDs from list items + for item in result_data[:10]: # Limit to first 10 for logging + if isinstance(item, dict): + for id_field in ['id', 'ID', 'uid', 'uuid', 'message_id', 'email_id']: + if id_field in item: + items_processed.append(str(item[id_field])) + break - extraction_prompt = f"""Given the following tool execution result, extract and summarize the most relevant information that would be useful for completing the current task. - -Tool Name: {tool_name} -Tool Arguments: {json.dumps(tool_args, indent=2) if tool_args else "None"} -Tool Result: {tool_result.result} - -Current Task Context: -- Task Description: {self.task.description if self.task else "Not specified"} -- Expected Output: {self.task.expected_output if self.task else "Not specified"} -- Steps Completed: {self.agent_state.steps_completed} - -Instructions: -1. Identify the KEY INFORMATION from the tool result that directly relates to the task -2. Extract any important data points, facts, or findings -3. Note any errors, warnings, or issues that might affect task completion -4. Summarize in a concise format (max 3-5 bullet points) -5. Focus on information that will be useful for subsequent steps -6. Generate a descriptive key name that explains what data is being stored (e.g., "email_and_thread_ids", "search_results", "file_contents", etc.) -7. IMPORTANT: When extracting data_points, include ALL items from lists or collections, do not truncate or summarize the data - -Respond in the following JSON format: -{{ - "suggested_key_name": "descriptive_name_for_this_data", - "key_findings": ["finding1", "finding2", ...], - "data_points": {{"key": "value", ...}} or [list of items], - "issues": ["issue1", "issue2", ...] or null if none, - "relevance_score": 1-10 (how relevant this result is to the task) -}} - -Note: For data_points, preserve the complete data structure. If it's a list of items (like email IDs, search results, etc.), include ALL items.""" - - # Create messages for LLM call - messages = [format_message_for_llm(extraction_prompt, role="user")] - - # Call LLM to extract information - try: - print(f"[DEBUG] Calling LLM for scratchpad extraction...") - extraction_response = get_llm_response( - llm=self.llm, - messages=messages, - callbacks=self.callbacks, - printer=self._printer, - ) - print(f"[DEBUG] LLM extraction response received, length: {len(extraction_response)}") - - # Try to parse the JSON response directly - try: - extracted_info = json.loads(extraction_response) - print(f"[DEBUG] Successfully parsed JSON directly") - except json.JSONDecodeError: - print(f"[DEBUG] Failed to parse JSON directly, trying to extract from markdown...") - # If direct parsing fails, try to extract JSON from the response - # The LLM might have wrapped it in markdown code blocks or added extra text - json_match = None - - # Try to find JSON in markdown code blocks - import re - - json_pattern = r"```(?:json)?\s*(\{.*?\})\s*```" - matches = re.findall(json_pattern, extraction_response, re.DOTALL) - - if matches: - print(f"[DEBUG] Found {len(matches)} JSON blocks in markdown") - # Try to parse the first match - for match in matches: - try: - json_match = json.loads(match) - print(f"[DEBUG] Successfully parsed JSON from markdown") - break - except json.JSONDecodeError: - continue - - # If no markdown JSON found, try to find raw JSON object - if not json_match: - print(f"[DEBUG] No markdown JSON found, looking for raw JSON...") - # Look for JSON object in the response - json_start = extraction_response.find("{") - json_end = extraction_response.rfind("}") - if ( - json_start != -1 - and json_end != -1 - and json_end > json_start - ): - try: - potential_json = extraction_response[ - json_start : json_end + 1 - ] - json_match = json.loads(potential_json) - print(f"[DEBUG] Successfully extracted raw JSON") - except json.JSONDecodeError: - print(f"[DEBUG] Failed to parse raw JSON") - pass - - if json_match: - extracted_info = json_match - else: - # Couldn't parse JSON, raise to trigger fallback - print(f"[DEBUG] Could not extract any valid JSON, triggering fallback") - raise json.JSONDecodeError( - "Could not extract JSON", extraction_response, 0 - ) - - # Process the extracted info - # Use the suggested key name or fall back to default - suggested_key = extracted_info.get("suggested_key_name", "") - if suggested_key and suggested_key.replace("_", "").isalnum(): - scratchpad_key = suggested_key - else: - # Generate a meaningful key from tool name - scratchpad_key = tool_name.replace("_", "") - print(f"[DEBUG] Using scratchpad key: {scratchpad_key}") - - # Get the data points - data_points = extracted_info.get("data_points", {}) - - # Simplify the data structure based on what's extracted - if isinstance(data_points, list): - # If data_points is already a list, store it directly - data_to_store = data_points - elif isinstance(data_points, dict) and len(data_points) == 1: - # If it's a dict with a single key containing a list, extract the list - single_key = list(data_points.keys())[0] - if isinstance(data_points[single_key], list): - data_to_store = data_points[single_key] - else: - data_to_store = data_points - else: - data_to_store = data_points - - # Store based on relevance score - relevance_score = extracted_info.get("relevance_score", 0) - print(f"[DEBUG] Relevance score: {relevance_score}") - - if relevance_score >= 7: - # For high relevance, store just the data - self.agent_state.add_to_scratchpad(scratchpad_key, data_to_store) - print(f"[DEBUG] Stored high relevance data to scratchpad") - else: - # For lower relevance, include more context - self.agent_state.add_to_scratchpad( - scratchpad_key, - { - "data": data_to_store, - "tool": tool_name, - "findings": extracted_info.get("key_findings", []), - "relevance": relevance_score, - }, + # Log progress with item details + self.agent_state.log_progress( + action=f"Executed {tool_name}", + result=f"Retrieved {len(result_data)} items and stored in scratchpad", + items_processed=items_processed[:10] # Log up to 10 IDs ) - print(f"[DEBUG] Stored lower relevance data with context to scratchpad") - # Also store key findings if present and relevance is high - if relevance_score >= 7 and extracted_info.get("key_findings"): - current_findings = self.agent_state.scratchpad.get( - "key_findings", [] + self._printer.print( + content=f"βœ“ Stored {len(result_data)} items from {tool_name} to scratchpad", + color="green" ) - current_findings.extend(extracted_info["key_findings"]) - self.agent_state.add_to_scratchpad( - "key_findings", current_findings[-10:] + elif isinstance(result_data, dict): + # For dict results, log the action + self.agent_state.log_progress( + action=f"Executed {tool_name}", + result=f"Retrieved data object and stored in scratchpad" ) - print(f"[DEBUG] Updated key findings in scratchpad") - except (json.JSONDecodeError, KeyError, TypeError) as e: - print(f"[DEBUG] Exception during extraction: {type(e).__name__}: {str(e)}") - # Fallback for when we can't extract structured data - # Try to generate a meaningful key name from tool name - scratchpad_key = tool_name.replace("_", "") - - # Store the complete result without truncation - self.agent_state.add_to_scratchpad( - scratchpad_key, - { - "raw_response": extraction_response, # Store complete response - "tool_result": tool_result.result, # Store complete result - "extraction_failed": True, - "tool_args": tool_args - }, - ) - print(f"[DEBUG] Stored fallback data to scratchpad") + self._printer.print( + content=f"βœ“ Stored data from {tool_name} to scratchpad", + color="green" + ) + else: + # For other types, just log + self.agent_state.log_progress( + action=f"Executed {tool_name}", + result=f"Stored result in scratchpad" + ) except Exception as e: - # Log error but don't fail the entire execution print(f"[DEBUG] Failed to extract tool result: {type(e).__name__}: {str(e)}") + # Log error but don't fail the entire execution self._printer.print( content=f"Failed to extract tool result to scratchpad: {str(e)}", - color="yellow", + color="yellow" ) - # Still store complete information even if extraction fails - fallback_key = f"{tool_name}_raw_{self.agent_state.steps_completed}" - self.agent_state.add_to_scratchpad( - fallback_key, - { - "error": f"Extraction failed: {str(e)}", - "tool_result": tool_result.result, # Store complete result - "tool_name": tool_name, - "tool_args": tool_args, - "raw_data": True - }, - ) - print(f"[DEBUG] Stored error fallback data to scratchpad") - print(f"[DEBUG] _extract_tool_result_to_scratchpad completed") + self.agent_state.log_progress( + action=f"Failed to extract {tool_name} result", + result=str(e) + ) def _update_scratchpad_tool(self) -> None: """Update the scratchpad tool with current state data.""" @@ -1199,18 +757,59 @@ Note: For data_points, preserve the complete data structure. If it's a list of i # Find and update the tool in our tools list for i, tool in enumerate(self.tools): if hasattr(tool, 'name') and tool.name == self.scratchpad_tool.name: - # Update the description on the existing tool reference - if hasattr(tool, '_tool') and hasattr(tool._tool, 'description'): + # Update the underlying tool reference + if hasattr(tool, '_tool'): + # Update the wrapped tool's scratchpad data + tool._tool.scratchpad_data = self.agent_state.scratchpad tool._tool.description = self.scratchpad_tool.description - elif hasattr(tool, 'description'): + # Also update the wrapper's description + tool.description = self.scratchpad_tool.description + elif hasattr(tool, 'scratchpad_data'): + # Direct update if it's the tool itself + tool.scratchpad_data = self.agent_state.scratchpad tool.description = self.scratchpad_tool.description break # Regenerate tools description to reflect the updated tool self.tools_description = render_text_description_and_args(self.tools) + # Add a message to inform the agent about available scratchpad keys + if self.agent_state.scratchpad: + keys_info = self._get_scratchpad_keys_info() + if keys_info: + scratchpad_update_msg = ( + f"\nπŸ’Ύ SCRATCHPAD UPDATE: New data has been stored in your scratchpad memory.\n" + f"{keys_info}\n" + f"Use 'Access Scratchpad Memory' tool with the exact key name to retrieve any of this data." + ) + self._append_message(scratchpad_update_msg, role="system") + + def _get_scratchpad_keys_info(self) -> str: + """Get formatted information about available scratchpad keys.""" + if not self.agent_state.scratchpad: + return "" + + keys_info = [] + for key, value in self.agent_state.scratchpad.items(): + # Create a brief description of what's stored + if isinstance(value, dict): + if 'data' in value and isinstance(value['data'], list): + preview = f"list of {len(value['data'])} items" + else: + preview = f"dict with {len(value)} fields" + elif isinstance(value, list): + preview = f"list of {len(value)} items" + elif isinstance(value, str): + preview = f"string ({len(value)} chars)" + else: + preview = type(value).__name__ + + keys_info.append(f" β€’ '{key}': {preview}") + + return "Available keys:\n" + "\n".join(keys_info) + def _validate_acceptance_criteria(self, output: str) -> Tuple[bool, List[str]]: - """Validate if the output meets acceptance criteria. + """Validate if the output meets acceptance criteria using enhanced tracking. Args: output: The final output to validate @@ -1220,90 +819,91 @@ Note: For data_points, preserve the complete data structure. If it's a list of i """ print(f"[DEBUG] _validate_acceptance_criteria started") if not self.agent_state.acceptance_criteria: - # No criteria to validate print(f"[DEBUG] No acceptance criteria to validate") return True, [] - # Create a single prompt to check all criteria - criteria_list = "\n".join( - f"{i}. {criterion}" - for i, criterion in enumerate(self.agent_state.acceptance_criteria, 1) - ) - print(f"[DEBUG] Validating {len(self.agent_state.acceptance_criteria)} criteria") + unmet_criteria = [] - validation_prompt = f"""Given the following task output and acceptance criteria, identify which criteria have NOT been met. + # First, try deterministic validation based on tracked items + for criterion in self.agent_state.acceptance_criteria: + progress = self.agent_state.criteria_progress.get(criterion) -Task Output: -{output} + if progress: + # Use deterministic checks first + is_met = False + reason = "" -Expected Output Description: -{self.task.expected_output if self.task else "Not specified"} + # Check if we have expected item counts + if progress.total_items_expected is not None: + if len(progress.processed_items) >= progress.total_items_expected: + is_met = True + else: + reason = f"Only processed {len(progress.processed_items)}/{progress.total_items_expected} items" -Acceptance Criteria: -{criteria_list} + # Check completion percentage + elif progress.completion_percentage >= 95: # Allow slight margin + is_met = True -For each criterion, determine if it has been met or not met in the output. -Respond with a JSON object where keys are criterion numbers (1, 2, 3, etc.) and values are: -- "MET" if the criterion is satisfied -- "NOT MET: " if the criterion is not satisfied + # Check if marked as completed with items processed + elif progress.status == "completed" and progress.processed_items: + is_met = True -Example response format: -{{ - "1": "MET", - "2": "NOT MET: Missing specific examples", - "3": "MET" -}} -""" + # For criteria without specific tracking, fall back to content analysis + else: + is_met = self._validate_criterion_by_content(criterion, output) - try: - print(f"[DEBUG] Calling LLM for criteria validation...") - response = self.llm.call([ - {"role": "user", "content": validation_prompt} - ]) - print(f"[DEBUG] LLM validation response received") - - # Parse the response as JSON - import json - response_str = str(response).strip() - - # Try to extract JSON from the response - json_start = response_str.find('{') - json_end = response_str.rfind('}') + 1 - if json_start >= 0 and json_end > json_start: - json_str = response_str[json_start:json_end] - validation_results = json.loads(json_str) - print(f"[DEBUG] Successfully parsed validation JSON") - else: - # Fallback if JSON not found - self._logger.log("warning", f"Could not parse validation response as JSON: {response_str}") - print(f"[DEBUG] Failed to parse validation response as JSON") - # Assume all criteria not met if we can't parse - return False, self.agent_state.acceptance_criteria - - # Process results - unmet_criteria = [] - for i, criterion in enumerate(self.agent_state.acceptance_criteria, 1): - result = validation_results.get(str(i), "NOT MET") - if isinstance(result, str) and result.upper().startswith("NOT MET"): + if not is_met: unmet_criteria.append(criterion) - self._printer.print( - content=f"βœ— Criterion not met: {criterion}", - color="yellow" - ) + if reason: + self._printer.print( + content=f"βœ— Criterion not met: {criterion} - {reason}", + color="yellow" + ) + else: + self._printer.print( + content=f"βœ— Criterion not met: {criterion}", + color="yellow" + ) else: self._printer.print( content=f"βœ“ Criterion met: {criterion}", color="green" ) + else: + # No progress tracked for this criterion + if not self._validate_criterion_by_content(criterion, output): + unmet_criteria.append(criterion) + self._printer.print( + content=f"βœ— Criterion not met: {criterion} - No progress tracked", + color="yellow" + ) - print(f"[DEBUG] Validation complete: {len(unmet_criteria)} unmet criteria") - return len(unmet_criteria) == 0, unmet_criteria + print(f"[DEBUG] Validation complete: {len(unmet_criteria)} unmet criteria") - except Exception as e: - print(f"[DEBUG] Error validating criteria: {type(e).__name__}: {str(e)}") - self._logger.log("warning", f"Error validating criteria: {str(e)}") - # If we can't validate, assume all criteria are not met to be safe - return False, self.agent_state.acceptance_criteria + # Log validation result + self.agent_state.log_progress( + action="Validation check", + result=f"{len(self.agent_state.acceptance_criteria) - len(unmet_criteria)}/{len(self.agent_state.acceptance_criteria)} criteria met" + ) + + return len(unmet_criteria) == 0, unmet_criteria + + def _validate_criterion_by_content(self, criterion: str, output: str) -> bool: + """Validate a single criterion by analyzing the output content.""" + criterion_lower = criterion.lower() + output_lower = output.lower() + + # Look for key indicators in the output + if "all" in criterion_lower or "every" in criterion_lower: + # For "all" criteria, check if output indicates completeness + completeness_indicators = ["all", "every", "complete", "total", "full"] + return any(indicator in output_lower for indicator in completeness_indicators) + + # Check if key terms from criterion appear in output + important_words = [word for word in criterion_lower.split() if len(word) > 3] + matches = sum(1 for word in important_words if word in output_lower) + + return matches >= len(important_words) * 0.7 # 70% of important words should match def _create_criteria_retry_prompt(self, unmet_criteria: List[str]) -> str: """Create a prompt to retry task with unmet criteria. @@ -1321,26 +921,43 @@ Example response format: # Build information about what's in the scratchpad scratchpad_info = "" scratchpad_data_summary = "" + example_usage = "" if self.scratchpad_tool and self.agent_state.scratchpad: scratchpad_keys = list(self.agent_state.scratchpad.keys()) - scratchpad_info = f""" -πŸ“¦ YOUR SCRATCHPAD CONTAINS DATA: -{chr(10).join(f" β€’ '{key}'" for key in scratchpad_keys)} -TO ACCESS THIS DATA: Use the "Access Scratchpad Memory" tool with the key name. -Example: -Action: Access Scratchpad Memory -Action Input: {{"key": "{scratchpad_keys[0] if scratchpad_keys else 'key_name'}"}} -""" - # Add summary of what's in scratchpad - for key in scratchpad_keys[:3]: # Show first 3 keys as examples + # Create detailed summary of each key + key_details = [] + for key in scratchpad_keys: value = self.agent_state.scratchpad[key] if isinstance(value, list): - scratchpad_data_summary += f"\n - '{key}': contains {len(value)} items" + key_details.append(f" β€’ '{key}': contains {len(value)} items (list)") elif isinstance(value, dict): - scratchpad_data_summary += f"\n - '{key}': contains data with {len(value)} fields" + if 'data' in value and isinstance(value['data'], list): + key_details.append(f" β€’ '{key}': contains {len(value['data'])} items (nested list)") + else: + key_details.append(f" β€’ '{key}': contains data with {len(value)} fields (dict)") else: - scratchpad_data_summary += f"\n - '{key}': contains stored data" + key_details.append(f" β€’ '{key}': contains stored data") + + scratchpad_info = f""" +πŸ“¦ YOUR SCRATCHPAD CONTAINS THE FOLLOWING DATA: +{chr(10).join(key_details)} + +πŸ”‘ TO ACCESS THIS DATA: Use the "Access Scratchpad Memory" tool with the EXACT key name. +""" + + # Provide specific example based on first key + if scratchpad_keys: + example_key = scratchpad_keys[0] + example_usage = f""" +πŸ“‹ EXAMPLE - How to retrieve scratchpad data: + +Thought: I need to access the {example_key} from my scratchpad +Action: Access Scratchpad Memory +Action Input: {{"key": "{example_key}"}} + +⚠️ REMEMBER: Use the EXACT key name as shown above! +""" # Analyze what's missing based on criteria missing_data_hints = [] @@ -1364,12 +981,8 @@ Action Input: {{"key": "{scratchpad_keys[0] if scratchpad_keys else 'key_name'}" # Get progress summary progress_summary = f""" πŸ“Š CURRENT PROGRESS: -- Steps completed: {self.agent_state.steps_completed} -- Tools used: {len(self.agent_state.tool_usage_history)} times""" - - if self.agent_state.tool_usage_history: - recent_tools = self.agent_state.tool_usage_history[-3:] - progress_summary += f"\n- Recent tools: {', '.join(t.tool_name for t in recent_tools)}" +- Overall progress: {self.agent_state.overall_progress}% +- Criteria progress: {sum(1 for p in self.agent_state.criteria_progress.values() if p.status == 'completed')}/{len(self.agent_state.criteria_progress)} completed""" prompt = f"""❌ VALIDATION FAILED - YOU CANNOT PROVIDE A FINAL ANSWER YET! @@ -1389,30 +1002,25 @@ Your output is INCOMPLETE and missing critical information. DO NOT attempt another "Final Answer" until you have ALL required data. {progress_summary} -πŸ”§ REQUIRED ACTIONS: -1. STOP trying to provide a Final Answer -2. Switch to using Action/Action Input format -3. Use tools to gather the missing information {scratchpad_info} πŸ’‘ WHAT YOU'RE MISSING: {chr(10).join(f"β€’ {hint}" for hint in missing_data_hints) if missing_data_hints else "β€’ Review the criteria and gather all required data"} -{scratchpad_data_summary} -πŸ“‹ YOUR NEXT STEP: -You MUST use the following format to continue: +{example_usage} -Thought: I need to gather the missing data using tools -Action: [tool name] -Action Input: {{"parameter": "value"}} +πŸ”§ YOUR NEXT STEPS: +1. STOP trying to provide a Final Answer +2. ACCESS your scratchpad data FIRST if you haven't already +3. Use additional tools if needed to gather missing information +4. Only provide Final Answer when ALL data is complete {tools_hint} ⚠️ IMPORTANT REMINDERS: -- The task requires you to retrieve EVERY email, not just summaries -- You already have data in your scratchpad - ACCESS IT FIRST with "Access Scratchpad Memory" -- Each email needs: date, time, subject, sender, recipients, and content snippet -- Continue retrieving details for ALL emails until complete -- Only provide a Final Answer after you have gathered ALL required data +- The task requires COMPLETE data for EVERY item +- You already have data stored - ACCESS IT using "Access Scratchpad Memory" +- Each item needs ALL requested details (dates, subjects, senders, etc.) +- Continue until you have retrieved and processed ALL required data CONTINUE WITH TOOL USAGE NOW - DO NOT ATTEMPT ANOTHER FINAL ANSWER.""" @@ -1446,3 +1054,229 @@ CONTINUE WITH TOOL USAGE NOW - DO NOT ATTEMPT ANOTHER FINAL ANSWER.""" return False return True + + def _update_progress_tracking(self) -> None: + """Analyze current state and update progress tracking for each criterion.""" + if not self.agent_state.acceptance_criteria: + return + + try: + # For each criterion, analyze progress deterministically + for criterion in self.agent_state.acceptance_criteria: + # Get detailed analysis from scratchpad + analysis = self.agent_state.analyze_scratchpad_for_criterion_progress(criterion) + + # Get current progress or initialize + current_progress = self.agent_state.criteria_progress.get(criterion) + if not current_progress: + self.agent_state.initialize_criteria_progress() + current_progress = self.agent_state.criteria_progress.get(criterion) + + # Determine status based on analysis + if analysis["data_completeness"] >= 95: + status = "completed" + elif analysis["data_completeness"] > 0: + status = "in_progress" + else: + status = "not_started" + + # Generate specific next steps + next_steps = self.agent_state.generate_specific_next_steps(criterion) + remaining_work = " | ".join(next_steps[:2]) if next_steps else "Continue gathering data" + + # Build progress notes + notes_parts = [] + if analysis["item_count"] > 0: + notes_parts.append(f"{analysis['item_count']} items found") + if analysis["relevant_data"]: + notes_parts.append(f"Data in: {', '.join(analysis['relevant_data'][:2])}") + if analysis["specific_gaps"]: + notes_parts.append(f"Gap: {analysis['specific_gaps'][0]}") + + progress_notes = " - ".join(notes_parts) if notes_parts else "No data gathered yet" + + # Extract processed items from analysis + processed_items = list(analysis["processed_ids"]) if analysis["processed_ids"] else None + + # Update the criterion progress + self.agent_state.update_criterion_progress( + criterion=criterion, + status=status, + progress_notes=progress_notes, + completion_percentage=analysis["data_completeness"], + remaining_work=remaining_work, + processed_items=processed_items + ) + + # Log the update + self._printer.print( + content=f"Progress Update - {criterion}: {analysis['data_completeness']}% complete", + color="cyan" + ) + + # Update focus and next steps based on current progress + self._update_focus_and_next_steps() + + except Exception as e: + self._printer.print( + content=f"Error updating progress tracking: {str(e)}", + color="yellow" + ) + # Use fallback analysis + self._fallback_progress_analysis() + self._update_focus_and_next_steps() + + def _create_forced_final_answer(self) -> str: + """Create a forced final answer based on current progress when max iterations are reached.""" + + # Start with a note about incomplete execution + output_parts = ["Note: Task execution was stopped after reaching maximum iterations."] + + # Add progress summary if acceptance criteria exist + if self.agent_state.acceptance_criteria and self.agent_state.criteria_progress: + output_parts.append("\n## Progress Summary:") + for criterion, progress in self.agent_state.criteria_progress.items(): + status_icon = "βœ…" if progress.status == "completed" else "πŸ”„" if progress.status == "in_progress" else "❌" + output_parts.append(f"{status_icon} {criterion} ({progress.completion_percentage}%)") + if progress.progress_notes: + output_parts.append(f" - {progress.progress_notes}") + + # Add available data from scratchpad + if self.agent_state.scratchpad: + output_parts.append("\n## Available Data:") + for key, value in self.agent_state.scratchpad.items(): + if isinstance(value, list): + output_parts.append(f"\n### {key} ({len(value)} items):") + # Show first few items + for i, item in enumerate(value[:5]): + output_parts.append(f"{i+1}. {self._format_item_for_output(item)}") + if len(value) > 5: + output_parts.append(f"... and {len(value) - 5} more items") + elif isinstance(value, dict): + output_parts.append(f"\n### {key}:") + output_parts.append(self._format_item_for_output(value)) + else: + output_parts.append(f"\n### {key}:") + output_parts.append(str(value)[:200] + "..." if len(str(value)) > 200 else str(value)) + + # If no data available, check recent messages for any useful information + if not self.agent_state.scratchpad and self.messages: + output_parts.append("\n## Recent Activity:") + # Look for the last few assistant messages that might contain results + assistant_messages = [msg for msg in self.messages[-10:] if msg.get("role") == "assistant"] + for msg in assistant_messages[-3:]: + content = msg.get("content", "") + if "Observation:" in content: + # Extract observation content + obs_match = re.search(r"Observation:\s*(.+?)(?:\n|$)", content, re.DOTALL) + if obs_match: + output_parts.append(f"- {obs_match.group(1)[:200]}") + + return "\n".join(output_parts) + + def _format_item_for_output(self, item: Any) -> str: + """Format an item for inclusion in the final output.""" + if isinstance(item, dict): + # Format dictionary items nicely + parts = [] + for key in ["subject", "sender", "date", "snippet", "id"]: # Common email fields + if key in item: + parts.append(f"{key}: {item[key]}") + if parts: + return " | ".join(parts) + else: + # Generic dict formatting + return " | ".join(f"{k}: {v}" for k, v in list(item.items())[:3]) + elif isinstance(item, str): + return item[:100] + "..." if len(item) > 100 else item + else: + return str(item)[:100] + + def _update_focus_and_next_steps(self) -> None: + """Update the agent's focus and next steps based on current progress.""" + incomplete_criteria = [ + (c, p) for c, p in self.agent_state.criteria_progress.items() + if p.status != "completed" + ] + + if not incomplete_criteria: + # All criteria completed + self.agent_state.set_focus_and_next_steps( + focus="All acceptance criteria met - ready to provide final answer", + next_steps=["Compile all gathered data into the required format", "Provide comprehensive final answer"] + ) + else: + # Focus on the least progressed criterion + least_progress = min(incomplete_criteria, key=lambda x: x[1].completion_percentage) + criterion_name, progress = least_progress + + if progress.completion_percentage == 0: + focus = f"Need to start working on: {criterion_name}" + next_steps = [ + f"Use tools to gather data for: {criterion_name}", + "Store results in scratchpad for later access" + ] + else: + focus = f"Continue working on: {criterion_name} (currently {progress.completion_percentage}% complete)" + next_steps = [ + f"Complete remaining work: {progress.remaining_work}", + "Verify data completeness before moving to next criterion" + ] + + # Add specific guidance based on scratchpad content + if self.agent_state.scratchpad: + next_steps.append("Access scratchpad data to build on existing progress") + + self.agent_state.set_focus_and_next_steps(focus, next_steps) + + def _fallback_progress_analysis(self) -> None: + """Fallback progress analysis using simple keyword matching.""" + for criterion in self.agent_state.acceptance_criteria: + # Use scratchpad analysis to determine progress + analysis = self.agent_state.analyze_scratchpad_for_criterion_progress(criterion) + + # Determine status and progress + if analysis["data_completeness"] >= 90: + status = "completed" + progress = 100 + remaining = "" + elif analysis["data_completeness"] > 0: + status = "in_progress" + progress = analysis["data_completeness"] + remaining = self._determine_remaining_work(criterion, analysis) + else: + status = "not_started" + progress = 0 + remaining = "Need to gather data for this criterion" + + # Build progress notes + notes = "" + if analysis["relevant_data"]: + notes = f"Found data in: {', '.join(analysis['relevant_data'])}" + + # Update the criterion progress + self.agent_state.update_criterion_progress( + criterion=criterion, + status=status, + progress_notes=notes, + completion_percentage=progress, + remaining_work=remaining + ) + + def _determine_remaining_work(self, criterion: str, analysis: Dict[str, Any]) -> str: + """Determine what work remains for a criterion based on analysis.""" + criterion_lower = criterion.lower() + + # Analyze based on common patterns + if "all" in criterion_lower or "every" in criterion_lower: + if analysis["relevant_data"]: + return "Ensure all items are included, not just a subset" + return "Need to gather comprehensive data covering all items" + + if "date" in criterion_lower or "time" in criterion_lower: + return "Include complete timestamp information" + + if "format" in criterion_lower: + return "Format data according to requirements" + + return "Complete data gathering for this criterion" diff --git a/src/crewai/llm.py b/src/crewai/llm.py index 89ddfe2a1..1e7c705a2 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -74,7 +74,7 @@ class FilteredStream(io.TextIOBase): "give feedback / get help" in lower_s or "litellm.info:" in lower_s or "litellm" in lower_s - or "Consider using a smaller input or implementing a text splitting strategy" in lower_s + or "consider using a smaller input or implementing a text splitting strategy" in lower_s ): return 0 diff --git a/src/crewai/tools/agent_tools/scratchpad_tool.py b/src/crewai/tools/agent_tools/scratchpad_tool.py index 96c2daa42..51ab3800d 100644 --- a/src/crewai/tools/agent_tools/scratchpad_tool.py +++ b/src/crewai/tools/agent_tools/scratchpad_tool.py @@ -1,6 +1,6 @@ """Tool for accessing data stored in the agent's scratchpad during reasoning.""" -from typing import Any, Dict, Optional, Type, Union +from typing import Any, Dict, Optional, Type, Union, Callable from pydantic import BaseModel, Field from crewai.tools import BaseTool @@ -29,6 +29,10 @@ class ScratchpadTool(BaseTool): args_schema: Type[BaseModel] = ScratchpadToolSchema scratchpad_data: Dict[str, Any] = Field(default_factory=dict) + # Allow repeated usage of this tool - scratchpad access should not be limited + cache_function: Callable = lambda _args, _result: False # Don't cache scratchpad access + allow_repeated_usage: bool = True # Allow accessing the same key multiple times + def __init__(self, scratchpad_data: Optional[Dict[str, Any]] = None, **kwargs): """Initialize the scratchpad tool with optional initial data. @@ -53,25 +57,46 @@ class ScratchpadTool(BaseTool): Returns: The value associated with the key, or an error message if not found """ + print(f"[DEBUG] ScratchpadTool._run called with key: '{key}'") + print(f"[DEBUG] Current scratchpad keys: {list(self.scratchpad_data.keys())}") + print(f"[DEBUG] Scratchpad data size: {len(self.scratchpad_data)}") + if not self.scratchpad_data: return ( "❌ SCRATCHPAD IS EMPTY\n\n" "The scratchpad does not contain any data yet.\n" "Data will be automatically stored here as you use other tools.\n" - "Try executing other tools first to gather information." + "Try executing other tools first to gather information.\n\n" + "πŸ’‘ TIP: Tools like search, read, or fetch operations will automatically store their results in the scratchpad." ) if key not in self.scratchpad_data: available_keys = list(self.scratchpad_data.keys()) keys_formatted = "\n".join(f" - '{k}'" for k in available_keys) + # Create more helpful examples based on actual keys + example_key = available_keys[0] if available_keys else 'example_key' + + # Check if the user tried a similar key (case-insensitive or partial match) + similar_keys = [k for k in available_keys if key.lower() in k.lower() or k.lower() in key.lower()] + similarity_hint = "" + if similar_keys: + similarity_hint = f"\n\nπŸ” Did you mean one of these?\n" + "\n".join(f" - '{k}'" for k in similar_keys) + return ( - f"❌ KEY NOT FOUND: '{key}'\n\n" + f"❌ KEY NOT FOUND: '{key}'\n" + f"{'='*50}\n\n" f"The key '{key}' does not exist in the scratchpad.\n\n" - f"Available keys:\n{keys_formatted}\n\n" - f"To retrieve data, use the EXACT key name from the list above.\n" - f"Example Action Input: {{\"key\": \"{available_keys[0] if available_keys else 'example_key'}\"}}\n\n" - f"Remember: Keys are case-sensitive and must match exactly!" + f"πŸ“¦ AVAILABLE KEYS IN SCRATCHPAD:\n{keys_formatted}\n" + f"{similarity_hint}\n\n" + f"βœ… CORRECT USAGE EXAMPLE:\n" + f"Action: Access Scratchpad Memory\n" + f"Action Input: {{\"key\": \"{example_key}\"}}\n\n" + f"⚠️ IMPORTANT:\n" + f"- Keys are case-sensitive and must match EXACTLY\n" + f"- Use the exact key name from the list above\n" + f"- Do NOT modify or guess key names\n\n" + f"{'='*50}" ) value = self.scratchpad_data[key] @@ -79,12 +104,16 @@ class ScratchpadTool(BaseTool): # Format the output nicely based on the type if isinstance(value, dict): import json - return json.dumps(value, indent=2) + formatted_output = f"βœ… Successfully retrieved data for key '{key}':\n\n" + formatted_output += json.dumps(value, indent=2) + return formatted_output elif isinstance(value, list): import json - return json.dumps(value, indent=2) + formatted_output = f"βœ… Successfully retrieved data for key '{key}':\n\n" + formatted_output += json.dumps(value, indent=2) + return formatted_output else: - return str(value) + return f"βœ… Successfully retrieved data for key '{key}':\n\n{str(value)}" def update_scratchpad(self, new_data: Dict[str, Any]) -> None: """Update the scratchpad data and refresh the tool description. diff --git a/src/crewai/tools/base_tool.py b/src/crewai/tools/base_tool.py index fb0428ccd..6ab393c88 100644 --- a/src/crewai/tools/base_tool.py +++ b/src/crewai/tools/base_tool.py @@ -39,6 +39,8 @@ class BaseTool(BaseModel, ABC): """Maximum number of times this tool can be used. None means unlimited usage.""" current_usage_count: int = 0 """Current number of times this tool has been used.""" + allow_repeated_usage: bool = False + """Flag to allow this tool to be used repeatedly with the same arguments.""" @field_validator("args_schema", mode="before") @classmethod @@ -57,7 +59,7 @@ class BaseTool(BaseModel, ABC): }, }, ) - + @field_validator("max_usage_count", mode="before") @classmethod def validate_max_usage_count(cls, v: int | None) -> int | None: @@ -81,11 +83,11 @@ class BaseTool(BaseModel, ABC): # If _run is async, we safely run it if asyncio.iscoroutine(result): result = asyncio.run(result) - + self.current_usage_count += 1 - + return result - + def reset_usage_count(self) -> None: """Reset the current usage count to zero.""" self.current_usage_count = 0 @@ -109,6 +111,8 @@ class BaseTool(BaseModel, ABC): result_as_answer=self.result_as_answer, max_usage_count=self.max_usage_count, current_usage_count=self.current_usage_count, + allow_repeated_usage=self.allow_repeated_usage, + cache_function=self.cache_function, ) @classmethod @@ -272,7 +276,7 @@ def to_langchain( def tool(*args, result_as_answer: bool = False, max_usage_count: int | None = None) -> Callable: """ Decorator to create a tool from a function. - + Args: *args: Positional arguments, either the function to decorate or the tool name. result_as_answer: Flag to indicate if the tool result should be used as the final agent answer. diff --git a/src/crewai/tools/structured_tool.py b/src/crewai/tools/structured_tool.py index b19f5bd1b..5a8cc8e32 100644 --- a/src/crewai/tools/structured_tool.py +++ b/src/crewai/tools/structured_tool.py @@ -25,6 +25,8 @@ class CrewStructuredTool: result_as_answer: bool = False, max_usage_count: int | None = None, current_usage_count: int = 0, + allow_repeated_usage: bool = False, + cache_function: Optional[Callable] = None, ) -> None: """Initialize the structured tool. @@ -36,6 +38,8 @@ class CrewStructuredTool: result_as_answer: Whether to return the output directly max_usage_count: Maximum number of times this tool can be used. None means unlimited usage. current_usage_count: Current number of times this tool has been used. + allow_repeated_usage: Whether to allow this tool to be used repeatedly with the same arguments. + cache_function: Function that will be used to determine if the tool should be cached. """ self.name = name self.description = description @@ -45,6 +49,8 @@ class CrewStructuredTool: self.result_as_answer = result_as_answer self.max_usage_count = max_usage_count self.current_usage_count = current_usage_count + self.allow_repeated_usage = allow_repeated_usage + self.cache_function = cache_function if cache_function is not None else lambda _args=None, _result=None: True # Validate the function signature matches the schema self._validate_function_signature() diff --git a/src/crewai/tools/tool_usage.py b/src/crewai/tools/tool_usage.py index 56480d1cc..eea1642c1 100644 --- a/src/crewai/tools/tool_usage.py +++ b/src/crewai/tools/tool_usage.py @@ -149,7 +149,13 @@ class ToolUsage: tool: CrewStructuredTool, calling: Union[ToolCalling, InstructorToolCalling], ) -> str: - if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None) + # Check if tool allows repeated usage before blocking + allows_repeated = False + if hasattr(tool, 'allow_repeated_usage'): + allows_repeated = tool.allow_repeated_usage + elif hasattr(tool, '_tool') and hasattr(tool._tool, 'allow_repeated_usage'): + allows_repeated = tool._tool.allow_repeated_usage + if not allows_repeated and self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None) try: result = self._i18n.errors("task_repeated_usage").format( tool_names=self.tools_names @@ -369,6 +375,11 @@ class ToolUsage: def _format_result(self, result: Any) -> str: if self.task: self.task.used_tools += 1 + + # Handle None results explicitly + if result is None: + result = "No result returned from tool" + if self._should_remember_format(): result = self._remember_format(result=result) return str(result) @@ -391,9 +402,19 @@ class ToolUsage: if not self.tools_handler: return False if last_tool_usage := self.tools_handler.last_used_tool: - return (calling.tool_name == last_tool_usage.tool_name) and ( + # Add debug logging + print(f"[DEBUG] _check_tool_repeated_usage:") + print(f" Current tool: {calling.tool_name}") + print(f" Current args: {calling.arguments}") + print(f" Last tool: {last_tool_usage.tool_name}") + print(f" Last args: {last_tool_usage.arguments}") + + is_repeated = (calling.tool_name == last_tool_usage.tool_name) and ( calling.arguments == last_tool_usage.arguments ) + print(f" Is repeated: {is_repeated}") + + return is_repeated return False def _check_usage_limit(self, tool: Any, tool_name: str) -> str | None: diff --git a/tests/tools/agent_tools/test_scratchpad_tool.py b/tests/tools/agent_tools/test_scratchpad_tool.py index a509102d3..a3b0ce411 100644 --- a/tests/tools/agent_tools/test_scratchpad_tool.py +++ b/tests/tools/agent_tools/test_scratchpad_tool.py @@ -23,6 +23,8 @@ class TestScratchpadTool: assert "❌ SCRATCHPAD IS EMPTY" in result assert "does not contain any data yet" in result assert "Try executing other tools first" in result + assert "πŸ’‘ TIP:" in result + assert "search, read, or fetch operations" in result def test_key_not_found_error_message(self): """Test error message when key is not found.""" @@ -34,11 +36,14 @@ class TestScratchpadTool: result = tool._run(key="wrong_key") assert "❌ KEY NOT FOUND: 'wrong_key'" in result - assert "Available keys:" in result + assert "πŸ“¦ AVAILABLE KEYS IN SCRATCHPAD:" in result assert "- 'existing_key'" in result assert "- 'another_key'" in result - assert 'Example Action Input: {"key": "existing_key"}' in result - assert "Keys are case-sensitive" in result + assert 'βœ… CORRECT USAGE EXAMPLE:' in result + assert 'Action: Access Scratchpad Memory' in result + assert 'Action Input: {"key": "existing_key"}' in result + assert "⚠️ IMPORTANT:" in result + assert "Keys are case-sensitive and must match EXACTLY" in result def test_successful_retrieval_string(self): """Test successful retrieval of string data.""" @@ -47,7 +52,8 @@ class TestScratchpadTool: }) result = tool._run(key="message") - assert result == "Hello, World!" + assert "βœ… Successfully retrieved data for key 'message':" in result + assert "Hello, World!" in result def test_successful_retrieval_dict(self): """Test successful retrieval of dictionary data.""" @@ -57,6 +63,7 @@ class TestScratchpadTool: }) result = tool._run(key="user_data") + assert "βœ… Successfully retrieved data for key 'user_data':" in result assert '"name": "John"' in result assert '"age": 30' in result @@ -68,6 +75,7 @@ class TestScratchpadTool: }) result = tool._run(key="items") + assert "βœ… Successfully retrieved data for key 'items':" in result assert '"item1"' in result assert '"item2"' in result assert '"item3"' in result @@ -134,4 +142,35 @@ class TestScratchpadTool: assert "πŸ“Œ 'nested_dict': list of 3 items" in desc assert "πŸ“Œ 'empty_list': list of 0 items" in desc assert "πŸ“Œ 'boolean_value': bool" in desc - assert "πŸ“Œ 'number': int" in desc \ No newline at end of file + assert "πŸ“Œ 'number': int" in desc + + def test_similar_key_suggestion(self): + """Test that similar keys are suggested when a wrong key is used.""" + tool = ScratchpadTool(scratchpad_data={ + "email_search_results": ["email1", "email2"], + "email_details": {"id": "123"}, + "user_preferences": {"theme": "dark"} + }) + + # Test partial match + result = tool._run(key="email") + assert "πŸ” Did you mean one of these?" in result + + # Check that similar keys are in the suggestions + # Extract just the "Did you mean" section + did_you_mean_section = result.split("πŸ” Did you mean one of these?")[1].split("βœ… CORRECT USAGE EXAMPLE:")[0] + assert "- 'email_search_results'" in did_you_mean_section + assert "- 'email_details'" in did_you_mean_section + assert "- 'user_preferences'" not in did_you_mean_section + + # But user_preferences should still be in the full list + assert "- 'user_preferences'" in result + + # Test case-insensitive match + result = tool._run(key="EMAIL_DETAILS") + assert "πŸ” Did you mean one of these?" in result + assert "- 'email_details'" in result + + # Test no similar keys + result = tool._run(key="completely_different") + assert "πŸ” Did you mean one of these?" not in result \ No newline at end of file