This commit is contained in:
João Moura
2025-06-01 03:08:25 -07:00
parent 7c12aeaa0c
commit 6ecb30ee87
8 changed files with 1067 additions and 33 deletions

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable, Dict, List, Optional, Union, cast
from typing import Any, Callable, Dict, List, Optional, Union, cast, Tuple
import json
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -10,6 +10,7 @@ from crewai.agents.parser import (
OutputParserException,
)
from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.agent_tools.scratchpad_tool import ScratchpadTool
from crewai.llm import BaseLLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
@@ -19,6 +20,7 @@ from crewai.utilities.agent_utils import (
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
get_tool_names,
handle_agent_action_core,
handle_context_length,
handle_max_iterations_exceeded,
@@ -27,7 +29,9 @@ from crewai.utilities.agent_utils import (
has_reached_max_iterations,
is_context_length_exceeded,
process_llm_response,
render_text_description_and_args,
show_agent_logs,
parse_tools,
)
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.logger import Logger
@@ -89,6 +93,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.agent_state: AgentState = AgentState(
task_id=str(task.id) if task else None
)
self.scratchpad_tool: Optional[ScratchpadTool] = None
existing_stop = self.llm.stop or []
self.llm.stop = list(
set(
@@ -98,6 +103,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
)
# Initialize scratchpad tool if reasoning is enabled
if hasattr(self.agent, "reasoning") and self.agent.reasoning:
self._initialize_scratchpad_tool()
def _initialize_scratchpad_tool(self) -> None:
"""Initialize the scratchpad tool and add it to available tools."""
self.scratchpad_tool = ScratchpadTool(scratchpad_data=self.agent_state.scratchpad)
# Add to tools list if not already present
tool_names = [tool.name for tool in self.tools]
if self.scratchpad_tool.name not in tool_names:
# Use parse_tools to convert to CrewStructuredTool
parsed_scratchpad_tools = parse_tools([self.scratchpad_tool])
if parsed_scratchpad_tools:
structured_scratchpad_tool = parsed_scratchpad_tools[0]
self.tools.append(structured_scratchpad_tool)
# Update tool mappings
self.tool_name_to_tool_map[self.scratchpad_tool.name] = structured_scratchpad_tool
# Update tools names and descriptions
self.tools_names = get_tool_names(self.tools)
self.tools_description = render_text_description_and_args(self.tools)
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
# Reset agent state for new task execution
self.agent_state.reset(task_id=str(self.task.id) if self.task else None)
@@ -204,7 +233,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer = None
while not isinstance(formatted_answer, AgentFinish):
try:
print(f"\n[DEBUG] Starting iteration {self.iterations + 1}, max_iter: {self.max_iter}")
if has_reached_max_iterations(self.iterations, self.max_iter):
print(f"[DEBUG] Max iterations reached")
formatted_answer = handle_max_iterations_exceeded(
formatted_answer,
printer=self._printer,
@@ -216,15 +248,67 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
enforce_rpm_limit(self.request_within_rpm_limit)
print(f"[DEBUG] About to call LLM with {len(self.messages)} messages")
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
)
print(f"[DEBUG] LLM response received: {answer[:100]}..." if answer else "[DEBUG] No LLM response")
formatted_answer = process_llm_response(answer, self.use_stop_words)
print(f"[DEBUG] Formatted answer type: {type(formatted_answer).__name__}")
# Check if agent is trying to finish but hasn't met criteria
if isinstance(formatted_answer, AgentFinish):
print(f"[DEBUG] Agent trying to finish - checking acceptance criteria")
# Validate acceptance criteria if reasoning is enabled and criteria exist
if (hasattr(self.agent, "reasoning") and self.agent.reasoning
and self.agent_state.acceptance_criteria):
self._printer.print(
content="\nValidating acceptance criteria before finalizing...",
color="cyan"
)
print(f"[DEBUG] Starting validation of {len(self.agent_state.acceptance_criteria)} criteria")
is_valid, unmet_criteria = self._validate_acceptance_criteria(formatted_answer.output)
print(f"[DEBUG] Validation result: is_valid={is_valid}, unmet={len(unmet_criteria)}")
if not is_valid:
# Prevent task completion and force retry
self._printer.print(
content=f"\n❌ Cannot finalize - {len(unmet_criteria)} acceptance criteria not met:",
color="red"
)
for criterion in unmet_criteria:
self._printer.print(
content=f"{criterion}",
color="yellow"
)
# Create retry prompt
print(f"[DEBUG] Creating criteria retry prompt")
retry_prompt = self._create_criteria_retry_prompt(unmet_criteria)
# Add retry prompt to messages
self._append_message(retry_prompt, role="user")
# Force another iteration by resetting formatted_answer
formatted_answer = None
print(f"[DEBUG] Forcing another iteration due to unmet criteria")
# Continue the loop
continue
else:
self._printer.print(
content="\n✅ All acceptance criteria met!",
color="green"
)
if isinstance(formatted_answer, AgentAction):
print(f"[DEBUG] Agent action: tool={formatted_answer.tool}")
# Extract agent fingerprint if available
fingerprint_context = {}
if (
@@ -238,6 +322,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
}
print(f"[DEBUG] Executing tool: {formatted_answer.tool}")
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
fingerprint_context=fingerprint_context,
@@ -250,6 +335,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent=self.agent,
function_calling_llm=self.function_calling_llm,
)
print(f"[DEBUG] Tool execution completed")
formatted_answer = self._handle_agent_action(
formatted_answer, tool_result
)
@@ -297,7 +384,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
hasattr(self.agent, "reasoning")
and self.agent.reasoning
and tool_result
and formatted_answer.tool != "Access Scratchpad Memory" # Skip scratchpad tool itself
and self._is_tool_execution_successful(tool_result) # Only for successful executions
):
print(f"[DEBUG] Starting scratchpad extraction for {formatted_answer.tool}")
self._extract_tool_result_to_scratchpad(
tool_name=formatted_answer.tool,
tool_args=tool_args,
@@ -319,8 +409,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
# Increment steps in agent state
self.agent_state.increment_steps()
# Update scratchpad tool if it exists
if self.scratchpad_tool and self.agent_state.scratchpad:
print(f"[DEBUG] Updating scratchpad tool")
self._update_scratchpad_tool()
if self._should_trigger_reasoning():
print(f"[DEBUG] Triggering mid-execution reasoning")
self._handle_mid_execution_reasoning()
print(f"[DEBUG] Mid-execution reasoning completed")
else:
self.steps_since_reasoning += 1
@@ -328,6 +425,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._append_message(formatted_answer.text, role="assistant")
except OutputParserException as e:
print(f"[DEBUG] OutputParserException: {str(e)}")
formatted_answer = handle_output_parser_exception(
e=e,
messages=self.messages,
@@ -337,10 +435,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
except Exception as e:
print(f"[DEBUG] Exception in invoke loop: {type(e).__name__}: {str(e)}")
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
if is_context_length_exceeded(e):
print(f"[DEBUG] Context length exceeded, handling...")
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
@@ -357,12 +457,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
raise e
finally:
self.iterations += 1
print(f"[DEBUG] Iteration {self.iterations} completed")
# During the invoke loop, formatted_answer alternates between AgentAction
# (when the agent is using tools) and eventually becomes AgentFinish
# (when the agent reaches a final answer). This assertion confirms we've
# reached a final answer and helps type checking understand this transition.
assert isinstance(formatted_answer, AgentFinish)
self._show_logs(formatted_answer)
return formatted_answer
@@ -846,8 +948,44 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
tool_args: Arguments that were passed to the tool
tool_result: The result returned by the tool
"""
print(f"[DEBUG] _extract_tool_result_to_scratchpad started for tool: {tool_name}")
try:
# Check result size and potentially skip LLM extraction for very large results
result_str = str(tool_result.result)
result_size = len(result_str)
print(f"[DEBUG] Tool result size: {result_size} characters")
# For very large results (>100KB), skip LLM extraction and store directly
if result_size > 100000:
print(f"[DEBUG] Result too large ({result_size} chars), storing directly without LLM extraction")
scratchpad_key = tool_name.replace("_", "")
# Try to parse as JSON if possible
try:
if isinstance(tool_result.result, str):
result_data = json.loads(tool_result.result)
else:
result_data = tool_result.result
except:
result_data = tool_result.result
self.agent_state.add_to_scratchpad(
scratchpad_key,
{
"data": result_data,
"tool": tool_name,
"tool_args": tool_args,
"large_result": True,
"size": result_size
}
)
print(f"[DEBUG] Large result stored directly to scratchpad")
return
# Create a prompt for the LLM to extract relevant information
result_preview = str(tool_result.result)[:200] + "..." if len(str(tool_result.result)) > 200 else str(tool_result.result)
print(f"[DEBUG] Tool result preview: {result_preview}")
extraction_prompt = f"""Given the following tool execution result, extract and summarize the most relevant information that would be useful for completing the current task.
Tool Name: {tool_name}
@@ -866,6 +1004,7 @@ Instructions:
4. Summarize in a concise format (max 3-5 bullet points)
5. Focus on information that will be useful for subsequent steps
6. Generate a descriptive key name that explains what data is being stored (e.g., "email_and_thread_ids", "search_results", "file_contents", etc.)
7. IMPORTANT: When extracting data_points, include ALL items from lists or collections, do not truncate or summarize the data
Respond in the following JSON format:
{{
@@ -874,24 +1013,30 @@ Respond in the following JSON format:
"data_points": {{"key": "value", ...}} or [list of items],
"issues": ["issue1", "issue2", ...] or null if none,
"relevance_score": 1-10 (how relevant this result is to the task)
}}"""
}}
Note: For data_points, preserve the complete data structure. If it's a list of items (like email IDs, search results, etc.), include ALL items."""
# Create messages for LLM call
messages = [format_message_for_llm(extraction_prompt, role="user")]
# Call LLM to extract information
try:
print(f"[DEBUG] Calling LLM for scratchpad extraction...")
extraction_response = get_llm_response(
llm=self.llm,
messages=messages,
callbacks=self.callbacks,
printer=self._printer,
)
print(f"[DEBUG] LLM extraction response received, length: {len(extraction_response)}")
# Try to parse the JSON response directly
try:
extracted_info = json.loads(extraction_response)
print(f"[DEBUG] Successfully parsed JSON directly")
except json.JSONDecodeError:
print(f"[DEBUG] Failed to parse JSON directly, trying to extract from markdown...")
# If direct parsing fails, try to extract JSON from the response
# The LLM might have wrapped it in markdown code blocks or added extra text
json_match = None
@@ -903,16 +1048,19 @@ Respond in the following JSON format:
matches = re.findall(json_pattern, extraction_response, re.DOTALL)
if matches:
print(f"[DEBUG] Found {len(matches)} JSON blocks in markdown")
# Try to parse the first match
for match in matches:
try:
json_match = json.loads(match)
print(f"[DEBUG] Successfully parsed JSON from markdown")
break
except json.JSONDecodeError:
continue
# If no markdown JSON found, try to find raw JSON object
if not json_match:
print(f"[DEBUG] No markdown JSON found, looking for raw JSON...")
# Look for JSON object in the response
json_start = extraction_response.find("{")
json_end = extraction_response.rfind("}")
@@ -926,13 +1074,16 @@ Respond in the following JSON format:
json_start : json_end + 1
]
json_match = json.loads(potential_json)
print(f"[DEBUG] Successfully extracted raw JSON")
except json.JSONDecodeError:
print(f"[DEBUG] Failed to parse raw JSON")
pass
if json_match:
extracted_info = json_match
else:
# Couldn't parse JSON, raise to trigger fallback
print(f"[DEBUG] Could not extract any valid JSON, triggering fallback")
raise json.JSONDecodeError(
"Could not extract JSON", extraction_response, 0
)
@@ -945,6 +1096,7 @@ Respond in the following JSON format:
else:
# Generate a meaningful key from tool name
scratchpad_key = tool_name.replace("_", "")
print(f"[DEBUG] Using scratchpad key: {scratchpad_key}")
# Get the data points
data_points = extracted_info.get("data_points", {})
@@ -965,9 +1117,12 @@ Respond in the following JSON format:
# Store based on relevance score
relevance_score = extracted_info.get("relevance_score", 0)
print(f"[DEBUG] Relevance score: {relevance_score}")
if relevance_score >= 7:
# For high relevance, store just the data
self.agent_state.add_to_scratchpad(scratchpad_key, data_to_store)
print(f"[DEBUG] Stored high relevance data to scratchpad")
else:
# For lower relevance, include more context
self.agent_state.add_to_scratchpad(
@@ -979,6 +1134,7 @@ Respond in the following JSON format:
"relevance": relevance_score,
},
)
print(f"[DEBUG] Stored lower relevance data with context to scratchpad")
# Also store key findings if present and relevance is high
if relevance_score >= 7 and extracted_info.get("key_findings"):
@@ -989,39 +1145,304 @@ Respond in the following JSON format:
self.agent_state.add_to_scratchpad(
"key_findings", current_findings[-10:]
)
print(f"[DEBUG] Updated key findings in scratchpad")
except (json.JSONDecodeError, KeyError, TypeError):
except (json.JSONDecodeError, KeyError, TypeError) as e:
print(f"[DEBUG] Exception during extraction: {type(e).__name__}: {str(e)}")
# Fallback for when we can't extract structured data
# Try to generate a meaningful key name from tool name
scratchpad_key = tool_name.replace("_", "")
# Store a preview of the result
# Store the complete result without truncation
self.agent_state.add_to_scratchpad(
scratchpad_key,
{
"raw_response": extraction_response[:500] + "..."
if len(extraction_response) > 500
else extraction_response,
"tool_result_preview": str(tool_result.result)[:300] + "..."
if len(str(tool_result.result)) > 300
else str(tool_result.result),
"raw_response": extraction_response, # Store complete response
"tool_result": tool_result.result, # Store complete result
"extraction_failed": True,
"tool_args": tool_args
},
)
print(f"[DEBUG] Stored fallback data to scratchpad")
except Exception as e:
# Log error but don't fail the entire execution
print(f"[DEBUG] Failed to extract tool result: {type(e).__name__}: {str(e)}")
self._printer.print(
content=f"Failed to extract tool result to scratchpad: {str(e)}",
color="yellow",
)
# Still store basic information even if extraction fails
# Still store complete information even if extraction fails
fallback_key = f"{tool_name}_raw_{self.agent_state.steps_completed}"
self.agent_state.add_to_scratchpad(
fallback_key,
{
"error": f"Extraction failed: {str(e)}",
"raw_preview": str(tool_result.result)[:200] + "..."
if len(str(tool_result.result)) > 200
else str(tool_result.result),
"tool_result": tool_result.result, # Store complete result
"tool_name": tool_name,
"tool_args": tool_args,
"raw_data": True
},
)
print(f"[DEBUG] Stored error fallback data to scratchpad")
print(f"[DEBUG] _extract_tool_result_to_scratchpad completed")
def _update_scratchpad_tool(self) -> None:
"""Update the scratchpad tool with current state data."""
if not self.scratchpad_tool:
return
# Update the tool's data
self.scratchpad_tool.update_scratchpad(self.agent_state.scratchpad)
# Find and update the tool in our tools list
for i, tool in enumerate(self.tools):
if hasattr(tool, 'name') and tool.name == self.scratchpad_tool.name:
# Update the description on the existing tool reference
if hasattr(tool, '_tool') and hasattr(tool._tool, 'description'):
tool._tool.description = self.scratchpad_tool.description
elif hasattr(tool, 'description'):
tool.description = self.scratchpad_tool.description
break
# Regenerate tools description to reflect the updated tool
self.tools_description = render_text_description_and_args(self.tools)
def _validate_acceptance_criteria(self, output: str) -> Tuple[bool, List[str]]:
"""Validate if the output meets acceptance criteria.
Args:
output: The final output to validate
Returns:
Tuple[bool, List[str]]: (is_valid, list of unmet criteria)
"""
print(f"[DEBUG] _validate_acceptance_criteria started")
if not self.agent_state.acceptance_criteria:
# No criteria to validate
print(f"[DEBUG] No acceptance criteria to validate")
return True, []
# Create a single prompt to check all criteria
criteria_list = "\n".join(
f"{i}. {criterion}"
for i, criterion in enumerate(self.agent_state.acceptance_criteria, 1)
)
print(f"[DEBUG] Validating {len(self.agent_state.acceptance_criteria)} criteria")
validation_prompt = f"""Given the following task output and acceptance criteria, identify which criteria have NOT been met.
Task Output:
{output}
Expected Output Description:
{self.task.expected_output if self.task else "Not specified"}
Acceptance Criteria:
{criteria_list}
For each criterion, determine if it has been met or not met in the output.
Respond with a JSON object where keys are criterion numbers (1, 2, 3, etc.) and values are:
- "MET" if the criterion is satisfied
- "NOT MET: <brief reason>" if the criterion is not satisfied
Example response format:
{{
"1": "MET",
"2": "NOT MET: Missing specific examples",
"3": "MET"
}}
"""
try:
print(f"[DEBUG] Calling LLM for criteria validation...")
response = self.llm.call([
{"role": "user", "content": validation_prompt}
])
print(f"[DEBUG] LLM validation response received")
# Parse the response as JSON
import json
response_str = str(response).strip()
# Try to extract JSON from the response
json_start = response_str.find('{')
json_end = response_str.rfind('}') + 1
if json_start >= 0 and json_end > json_start:
json_str = response_str[json_start:json_end]
validation_results = json.loads(json_str)
print(f"[DEBUG] Successfully parsed validation JSON")
else:
# Fallback if JSON not found
self._logger.log("warning", f"Could not parse validation response as JSON: {response_str}")
print(f"[DEBUG] Failed to parse validation response as JSON")
# Assume all criteria not met if we can't parse
return False, self.agent_state.acceptance_criteria
# Process results
unmet_criteria = []
for i, criterion in enumerate(self.agent_state.acceptance_criteria, 1):
result = validation_results.get(str(i), "NOT MET")
if isinstance(result, str) and result.upper().startswith("NOT MET"):
unmet_criteria.append(criterion)
self._printer.print(
content=f"✗ Criterion not met: {criterion}",
color="yellow"
)
else:
self._printer.print(
content=f"✓ Criterion met: {criterion}",
color="green"
)
print(f"[DEBUG] Validation complete: {len(unmet_criteria)} unmet criteria")
return len(unmet_criteria) == 0, unmet_criteria
except Exception as e:
print(f"[DEBUG] Error validating criteria: {type(e).__name__}: {str(e)}")
self._logger.log("warning", f"Error validating criteria: {str(e)}")
# If we can't validate, assume all criteria are not met to be safe
return False, self.agent_state.acceptance_criteria
def _create_criteria_retry_prompt(self, unmet_criteria: List[str]) -> str:
"""Create a prompt to retry task with unmet criteria.
Args:
unmet_criteria: List of criteria that weren't met
Returns:
str: The retry prompt
"""
# Get task context
task_description = self.task.description if self.task else "Not specified"
expected_output = self.task.expected_output if self.task else "Not specified"
# Build information about what's in the scratchpad
scratchpad_info = ""
scratchpad_data_summary = ""
if self.scratchpad_tool and self.agent_state.scratchpad:
scratchpad_keys = list(self.agent_state.scratchpad.keys())
scratchpad_info = f"""
📦 YOUR SCRATCHPAD CONTAINS DATA:
{chr(10).join(f"'{key}'" for key in scratchpad_keys)}
TO ACCESS THIS DATA: Use the "Access Scratchpad Memory" tool with the key name.
Example:
Action: Access Scratchpad Memory
Action Input: {{"key": "{scratchpad_keys[0] if scratchpad_keys else 'key_name'}"}}
"""
# Add summary of what's in scratchpad
for key in scratchpad_keys[:3]: # Show first 3 keys as examples
value = self.agent_state.scratchpad[key]
if isinstance(value, list):
scratchpad_data_summary += f"\n - '{key}': contains {len(value)} items"
elif isinstance(value, dict):
scratchpad_data_summary += f"\n - '{key}': contains data with {len(value)} fields"
else:
scratchpad_data_summary += f"\n - '{key}': contains stored data"
# Analyze what's missing based on criteria
missing_data_hints = []
for criterion in unmet_criteria:
criterion_lower = criterion.lower()
if "every email" in criterion_lower or "all" in criterion_lower:
missing_data_hints.append("You need to retrieve ALL emails, not just a summary")
if "date" in criterion_lower or "time" in criterion_lower:
missing_data_hints.append("Include complete date/time information for each record")
if "subject" in criterion_lower or "sender" in criterion_lower or "recipients" in criterion_lower:
missing_data_hints.append("Ensure all email metadata (subject, sender, recipients) is included")
if "format" in criterion_lower or "list" in criterion_lower:
missing_data_hints.append("Format the data properly as requested")
if "summary" in criterion_lower or "concise" in criterion_lower:
missing_data_hints.append("Include a concise summary/snippet for each email")
# Get available tools (excluding scratchpad tool)
available_tools = [tool for tool in self.tools_names.split(", ") if tool != "Access Scratchpad Memory"]
tools_hint = f"\n🛠️ AVAILABLE TOOLS: {', '.join(available_tools)}" if available_tools else ""
# Get progress summary
progress_summary = f"""
📊 CURRENT PROGRESS:
- Steps completed: {self.agent_state.steps_completed}
- Tools used: {len(self.agent_state.tool_usage_history)} times"""
if self.agent_state.tool_usage_history:
recent_tools = self.agent_state.tool_usage_history[-3:]
progress_summary += f"\n- Recent tools: {', '.join(t.tool_name for t in recent_tools)}"
prompt = f"""❌ VALIDATION FAILED - YOU CANNOT PROVIDE A FINAL ANSWER YET!
Your output is INCOMPLETE and missing critical information.
🎯 ORIGINAL TASK:
{task_description}
📋 EXPECTED OUTPUT:
{expected_output}
❌ UNMET CRITERIA:
{chr(10).join(f"{criterion}" for criterion in unmet_criteria)}
⚠️ CRITICAL: You MUST go back to using tools to gather the missing data!
DO NOT attempt another "Final Answer" until you have ALL required data.
{progress_summary}
🔧 REQUIRED ACTIONS:
1. STOP trying to provide a Final Answer
2. Switch to using Action/Action Input format
3. Use tools to gather the missing information
{scratchpad_info}
💡 WHAT YOU'RE MISSING:
{chr(10).join(f"{hint}" for hint in missing_data_hints) if missing_data_hints else "• Review the criteria and gather all required data"}
{scratchpad_data_summary}
📋 YOUR NEXT STEP:
You MUST use the following format to continue:
Thought: I need to gather the missing data using tools
Action: [tool name]
Action Input: {{"parameter": "value"}}
{tools_hint}
⚠️ IMPORTANT REMINDERS:
- The task requires you to retrieve EVERY email, not just summaries
- You already have data in your scratchpad - ACCESS IT FIRST with "Access Scratchpad Memory"
- Each email needs: date, time, subject, sender, recipients, and content snippet
- Continue retrieving details for ALL emails until complete
- Only provide a Final Answer after you have gathered ALL required data
CONTINUE WITH TOOL USAGE NOW - DO NOT ATTEMPT ANOTHER FINAL ANSWER."""
return prompt
def _is_tool_execution_successful(self, tool_result: ToolResult) -> bool:
"""Check if a tool execution was successful based on the tool result."""
if tool_result.result is None or tool_result.result == "":
return False
# Check for common error indicators in the result
result_str = str(tool_result.result).lower()
error_indicators = [
"error",
"exception",
"failed",
"unable to",
"couldn't",
"not found",
"invalid",
"wrong tool name",
"don't exist",
"tool usage exception",
"moving on then",
"has reached its usage limit"
]
# If any error indicator is found in the result, consider it a failure
for indicator in error_indicators:
if indicator in result_str:
return False
return True

View File

@@ -314,7 +314,7 @@ class Crew(FlowTrackable, BaseModel):
def create_crew_memory(self) -> "Crew":
"""Initialize private memory attributes."""
self._external_memory = (
# External memory doesnt support a default value since it was designed to be managed entirely externally
# External memory doesn't support a default value since it was designed to be managed entirely externally
self.external_memory.set_crew(self) if self.external_memory else None
)
@@ -1081,6 +1081,23 @@ class Crew(FlowTrackable, BaseModel):
token_usage=token_usage,
)
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
total_usage_metrics = UsageMetrics()
for agent in self.agents:
if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
self.usage_metrics = total_usage_metrics
return total_usage_metrics
def _process_async_tasks(
self,
futures: List[Tuple[Task, Future[TaskOutput], int]],
@@ -1284,23 +1301,6 @@ class Crew(FlowTrackable, BaseModel):
for agent in self.agents:
agent.interpolate_inputs(inputs)
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
total_usage_metrics = UsageMetrics()
for agent in self.agents:
if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
self.usage_metrics = total_usage_metrics
return total_usage_metrics
def test(
self,
n_iterations: int,

View File

@@ -1 +1,6 @@
"""Agent tools for crewAI."""
from .agent_tools import AgentTools
from .scratchpad_tool import ScratchpadTool
__all__ = ["AgentTools", "ScratchpadTool"]

View File

@@ -0,0 +1,145 @@
"""Tool for accessing data stored in the agent's scratchpad during reasoning."""
from typing import Any, Dict, Optional, Type, Union
from pydantic import BaseModel, Field
from crewai.tools import BaseTool
class ScratchpadToolSchema(BaseModel):
"""Input schema for ScratchpadTool."""
key: str = Field(
...,
description=(
"The key name to retrieve data from the scratchpad. "
"Must be one of the available keys shown in the tool description. "
"Example: if 'email_data' is listed as available, use {\"key\": \"email_data\"}"
)
)
class ScratchpadTool(BaseTool):
"""Tool that allows agents to access data stored in their scratchpad during task execution.
This tool's description is dynamically updated to show all available keys,
making it easy for agents to know what data they can retrieve.
"""
name: str = "Access Scratchpad Memory"
description: str = "Access data stored in your scratchpad memory during task execution."
args_schema: Type[BaseModel] = ScratchpadToolSchema
scratchpad_data: Dict[str, Any] = Field(default_factory=dict)
def __init__(self, scratchpad_data: Optional[Dict[str, Any]] = None, **kwargs):
"""Initialize the scratchpad tool with optional initial data.
Args:
scratchpad_data: Initial scratchpad data (usually from agent state)
"""
super().__init__(**kwargs)
if scratchpad_data:
self.scratchpad_data = scratchpad_data
self._update_description()
def _run(
self,
key: str,
**kwargs: Any,
) -> Union[str, Dict[str, Any], Any]:
"""Retrieve data from the scratchpad using the specified key.
Args:
key: The key to look up in the scratchpad
Returns:
The value associated with the key, or an error message if not found
"""
if not self.scratchpad_data:
return (
"❌ SCRATCHPAD IS EMPTY\n\n"
"The scratchpad does not contain any data yet.\n"
"Data will be automatically stored here as you use other tools.\n"
"Try executing other tools first to gather information."
)
if key not in self.scratchpad_data:
available_keys = list(self.scratchpad_data.keys())
keys_formatted = "\n".join(f" - '{k}'" for k in available_keys)
return (
f"❌ KEY NOT FOUND: '{key}'\n\n"
f"The key '{key}' does not exist in the scratchpad.\n\n"
f"Available keys:\n{keys_formatted}\n\n"
f"To retrieve data, use the EXACT key name from the list above.\n"
f"Example Action Input: {{\"key\": \"{available_keys[0] if available_keys else 'example_key'}\"}}\n\n"
f"Remember: Keys are case-sensitive and must match exactly!"
)
value = self.scratchpad_data[key]
# Format the output nicely based on the type
if isinstance(value, dict):
import json
return json.dumps(value, indent=2)
elif isinstance(value, list):
import json
return json.dumps(value, indent=2)
else:
return str(value)
def update_scratchpad(self, new_data: Dict[str, Any]) -> None:
"""Update the scratchpad data and refresh the tool description.
Args:
new_data: The new complete scratchpad data
"""
self.scratchpad_data = new_data
self._update_description()
def _update_description(self) -> None:
"""Update the tool description to include all available keys."""
base_description = (
"Access data stored in your scratchpad memory during task execution.\n\n"
"HOW TO USE THIS TOOL:\n"
"Provide a JSON object with a 'key' field containing the exact name of the data you want to retrieve.\n"
"Example: {\"key\": \"email_data\"}"
)
if not self.scratchpad_data:
self.description = (
f"{base_description}\n\n"
"📝 STATUS: Scratchpad is currently empty.\n"
"Data will be automatically stored here as you use other tools."
)
return
# Build a description of available keys with a preview of their contents
key_descriptions = []
example_key = None
for key, value in self.scratchpad_data.items():
if not example_key:
example_key = key
# Create a brief description of what's stored
if isinstance(value, dict):
preview = f"dict with {len(value)} items"
if 'data' in value and isinstance(value['data'], list):
preview = f"list of {len(value['data'])} items"
elif isinstance(value, list):
preview = f"list of {len(value)} items"
elif isinstance(value, str):
preview = f"string ({len(value)} chars)"
else:
preview = type(value).__name__
key_descriptions.append(f" 📌 '{key}': {preview}")
available_keys_text = "\n".join(key_descriptions)
self.description = (
f"{base_description}\n\n"
f"📦 AVAILABLE DATA IN SCRATCHPAD:\n{available_keys_text}\n\n"
f"💡 EXAMPLE USAGE:\n"
f"To retrieve the '{example_key}' data, use:\n"
f"Action Input: {{\"key\": \"{example_key}\"}}"
)

View File

@@ -41,7 +41,8 @@
"wrong_tool_name": "You tried to use the tool {tool}, but it doesn't exist. You must use one of the following tools, use one at time: {tools}.",
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}",
"agent_tool_execution_error": "Error executing task with agent '{agent_role}'. Error: {error}",
"validation_error": "### Previous attempt failed validation: {guardrail_result_error}\n\n\n### Previous result:\n{task_output}\n\n\nTry again, making sure to address the validation error."
"validation_error": "### Previous attempt failed validation: {guardrail_result_error}\n\n\n### Previous result:\n{task_output}\n\n\nTry again, making sure to address the validation error.",
"criteria_validation_error": "### Your answer did not meet all acceptance criteria\n\n### Unmet criteria:\n{unmet_criteria}\n\n### Previous result:\n{task_output}\n\n\nPlease revise your answer to ensure ALL acceptance criteria are met. Use the 'Access Scratchpad Memory' tool if you need to retrieve any previously collected information."
},
"tools": {
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolutely everything you know, don't reference things but instead explain them.",

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,215 @@
"""Unit tests for acceptance criteria validation feature at task level."""
import pytest
from unittest.mock import MagicMock, patch, call
from typing import List, Tuple
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.agents.agent_state import AgentState
from crewai.tools.agent_tools.scratchpad_tool import ScratchpadTool
from crewai.agents.parser import AgentFinish
from crewai.utilities import Printer
from crewai.llm import LLM
class TestAcceptanceCriteriaValidation:
"""Test suite for task-level acceptance criteria validation functionality."""
def setup_method(self):
"""Set up test fixtures."""
self.mock_llm = MagicMock(spec=LLM)
self.mock_agent = MagicMock()
self.mock_task = MagicMock()
self.mock_crew = MagicMock()
self.mock_tools_handler = MagicMock()
# Set up agent attributes
self.mock_agent.role = "Test Agent"
self.mock_agent.reasoning = True
self.mock_agent.verbose = False
self.mock_agent.reasoning_interval = None
self.mock_agent.adaptive_reasoning = False
# Create executor
self.executor = CrewAgentExecutor(
llm=self.mock_llm,
task=self.mock_task,
crew=self.mock_crew,
agent=self.mock_agent,
prompt={},
max_iter=10,
tools=[],
tools_names="",
stop_words=[],
tools_description="",
tools_handler=self.mock_tools_handler,
callbacks=[]
)
# Set up agent state with acceptance criteria
self.executor.agent_state = AgentState(task_id="test-task-id")
self.executor.agent_state.acceptance_criteria = [
"Include all required information",
"Format output properly",
"Provide complete analysis"
]
# Mock printer
self.executor._printer = MagicMock(spec=Printer)
def test_validate_acceptance_criteria_all_met(self):
"""Test validation when all acceptance criteria are met."""
output = "Complete output with all information, properly formatted, with full analysis"
# Configure LLM to return all criteria met
self.mock_llm.call.return_value = '''{
"1": "MET",
"2": "MET",
"3": "MET"
}'''
is_valid, unmet_criteria = self.executor._validate_acceptance_criteria(output)
assert is_valid is True
assert unmet_criteria == []
assert self.mock_llm.call.call_count == 1
def test_validate_acceptance_criteria_some_unmet(self):
"""Test validation when some criteria are not met."""
output = "Partial output missing formatting"
# Configure LLM to return mixed results
self.mock_llm.call.return_value = '''{
"1": "MET",
"2": "NOT MET: Missing proper formatting",
"3": "NOT MET: Analysis incomplete"
}'''
is_valid, unmet_criteria = self.executor._validate_acceptance_criteria(output)
assert is_valid is False
assert len(unmet_criteria) == 2
assert "Format output properly" in unmet_criteria
assert "Provide complete analysis" in unmet_criteria
def test_create_criteria_retry_prompt_with_scratchpad(self):
"""Test retry prompt creation when scratchpad has data."""
# Set up scratchpad tool with data
self.executor.scratchpad_tool = ScratchpadTool()
self.executor.agent_state.scratchpad = {
"research_data": {"key": "value"},
"analysis_results": ["item1", "item2"]
}
# Set up task details
self.mock_task.description = "Analyze research data and provide insights"
self.mock_task.expected_output = "A comprehensive report with analysis and recommendations"
unmet_criteria = ["Include specific examples", "Add recommendations"]
prompt = self.executor._create_criteria_retry_prompt(unmet_criteria)
# Verify prompt content with new format
assert "VALIDATION FAILED" in prompt
assert "YOU CANNOT PROVIDE A FINAL ANSWER YET" in prompt
assert "ORIGINAL TASK:" in prompt
assert "Analyze research data" in prompt
assert "EXPECTED OUTPUT:" in prompt
assert "comprehensive report" in prompt
assert "Include specific examples" in prompt
assert "Add recommendations" in prompt
assert "Access Scratchpad Memory" in prompt
assert "'research_data'" in prompt
assert "'analysis_results'" in prompt
assert "Action:" in prompt
assert "Action Input:" in prompt
assert "CONTINUE WITH TOOL USAGE NOW" in prompt
assert "DO NOT ATTEMPT ANOTHER FINAL ANSWER" in prompt
def test_create_criteria_retry_prompt_without_scratchpad(self):
"""Test retry prompt creation when no scratchpad data exists."""
unmet_criteria = ["Add more detail"]
prompt = self.executor._create_criteria_retry_prompt(unmet_criteria)
assert "Add more detail" in prompt
assert "VALIDATION FAILED" in prompt
assert "📦 YOUR SCRATCHPAD CONTAINS DATA" not in prompt
@patch('crewai.agents.crew_agent_executor.get_llm_response')
@patch('crewai.agents.crew_agent_executor.process_llm_response')
def test_invoke_loop_blocks_incomplete_final_answer(self, mock_process, mock_get_response):
"""Test that invoke loop blocks incomplete final answers."""
# Set up conditions
self.executor.agent_state.acceptance_criteria = ["Complete all sections"]
# First attempt returns incomplete final answer
incomplete_answer = AgentFinish(
thought="Done",
output="Exploring potential follow-up tasks!",
text="Final Answer: Exploring potential follow-up tasks!"
)
# After retry, return complete answer
complete_answer = AgentFinish(
thought="Done with all sections",
output="Complete output with all sections addressed",
text="Final Answer: Complete output with all sections addressed"
)
# Configure mocks
mock_process.side_effect = [incomplete_answer, complete_answer]
mock_get_response.return_value = "response"
# Configure validation
self.mock_llm.call.side_effect = [
'{"1": "NOT MET: Missing required sections"}', # First validation fails
'{"1": "MET"}' # Second validation passes
]
# Execute
result = self.executor._invoke_loop()
# Verify
assert result == complete_answer
assert self.mock_llm.call.call_count == 2 # Two validation attempts
assert mock_process.call_count == 2 # Two processing attempts
# Verify error message was shown
self._verify_validation_messages_shown()
def test_validation_happens_on_every_final_answer_attempt(self):
"""Test that validation happens on every AgentFinish attempt."""
self.executor.agent_state.acceptance_criteria = ["Complete all sections"]
# Configure LLM to always return criteria not met
self.mock_llm.call.return_value = '{"1": "NOT MET: Missing required sections"}'
output = "Incomplete output"
# Validate multiple times - each should trigger validation
for _ in range(3):
is_valid, unmet_criteria = self.executor._validate_acceptance_criteria(output)
assert is_valid is False
assert len(unmet_criteria) == 1
# Verify validation was called every time
assert self.mock_llm.call.call_count == 3
def _verify_validation_messages_shown(self):
"""Helper to verify validation messages were displayed."""
print_calls = self.executor._printer.print.call_args_list
# Check for validation message
validation_msg_shown = any(
"Validating acceptance criteria" in str(call)
for call in print_calls
)
# Check for failure message
failure_msg_shown = any(
"Cannot finalize" in str(call)
for call in print_calls
)
assert validation_msg_shown or failure_msg_shown

View File

@@ -0,0 +1,137 @@
"""Unit tests for the ScratchpadTool."""
import pytest
from crewai.tools.agent_tools.scratchpad_tool import ScratchpadTool, ScratchpadToolSchema
class TestScratchpadTool:
"""Test suite for the ScratchpadTool functionality."""
def test_schema_description(self):
"""Test that the schema has helpful description."""
schema = ScratchpadToolSchema
key_field = schema.model_fields['key']
assert "Example:" in key_field.description
assert '{"key":' in key_field.description
def test_empty_scratchpad_error_message(self):
"""Test error message when scratchpad is empty."""
tool = ScratchpadTool()
result = tool._run(key="nonexistent")
assert "❌ SCRATCHPAD IS EMPTY" in result
assert "does not contain any data yet" in result
assert "Try executing other tools first" in result
def test_key_not_found_error_message(self):
"""Test error message when key is not found."""
tool = ScratchpadTool(scratchpad_data={
"existing_key": "value",
"another_key": {"data": "test"}
})
result = tool._run(key="wrong_key")
assert "❌ KEY NOT FOUND: 'wrong_key'" in result
assert "Available keys:" in result
assert "- 'existing_key'" in result
assert "- 'another_key'" in result
assert 'Example Action Input: {"key": "existing_key"}' in result
assert "Keys are case-sensitive" in result
def test_successful_retrieval_string(self):
"""Test successful retrieval of string data."""
tool = ScratchpadTool(scratchpad_data={
"message": "Hello, World!"
})
result = tool._run(key="message")
assert result == "Hello, World!"
def test_successful_retrieval_dict(self):
"""Test successful retrieval of dictionary data."""
test_dict = {"name": "John", "age": 30}
tool = ScratchpadTool(scratchpad_data={
"user_data": test_dict
})
result = tool._run(key="user_data")
assert '"name": "John"' in result
assert '"age": 30' in result
def test_successful_retrieval_list(self):
"""Test successful retrieval of list data."""
test_list = ["item1", "item2", "item3"]
tool = ScratchpadTool(scratchpad_data={
"items": test_list
})
result = tool._run(key="items")
assert '"item1"' in result
assert '"item2"' in result
assert '"item3"' in result
def test_tool_description_empty(self):
"""Test tool description when scratchpad is empty."""
tool = ScratchpadTool()
assert "HOW TO USE THIS TOOL:" in tool.description
assert 'Example: {"key": "email_data"}' in tool.description
assert "📝 STATUS: Scratchpad is currently empty" in tool.description
def test_tool_description_with_data(self):
"""Test tool description when scratchpad has data."""
tool = ScratchpadTool(scratchpad_data={
"emails": ["email1@test.com", "email2@test.com"],
"results": {"count": 5, "status": "success"},
"api_key": "secret_key_123"
})
desc = tool.description
# Check basic structure
assert "HOW TO USE THIS TOOL:" in desc
assert "📦 AVAILABLE DATA IN SCRATCHPAD:" in desc
assert "💡 EXAMPLE USAGE:" in desc
# Check key listings
assert "📌 'emails': list of 2 items" in desc
assert "📌 'results': dict with 2 items" in desc
assert "📌 'api_key': string (14 chars)" in desc
# Check example uses first key
assert 'Action Input: {"key": "emails"}' in desc
def test_update_scratchpad(self):
"""Test updating scratchpad data."""
tool = ScratchpadTool()
# Initially empty
assert not tool.scratchpad_data
# Update with data
new_data = {"test": "value"}
tool.update_scratchpad(new_data)
assert tool.scratchpad_data == new_data
assert "📌 'test': string (5 chars)" in tool.description
def test_complex_data_preview(self):
"""Test preview generation for complex data structures."""
tool = ScratchpadTool(scratchpad_data={
"nested_dict": {
"data": ["item1", "item2", "item3"]
},
"empty_list": [],
"boolean_value": True,
"number": 42
})
desc = tool.description
# Special case for dict with 'data' key containing list
assert "📌 'nested_dict': list of 3 items" in desc
assert "📌 'empty_list': list of 0 items" in desc
assert "📌 'boolean_value': bool" in desc
assert "📌 'number': int" in desc