populating state

This commit is contained in:
João Moura
2025-06-01 01:27:26 -07:00
parent 7009a6b7a0
commit 4fcabd391f

View File

@@ -1,4 +1,5 @@
from typing import Any, Callable, Dict, List, Optional, Union, cast
import json
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
@@ -85,7 +86,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
tool.name: tool for tool in self.tools
}
self.steps_since_reasoning = 0
self.agent_state: AgentState = AgentState(task_id=str(task.id) if task else None)
self.agent_state: AgentState = AgentState(
task_id=str(task.id) if task else None
)
existing_stop = self.llm.stop or []
self.llm.stop = list(
set(
@@ -147,20 +150,35 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"""Populate agent state from the reasoning output if available."""
try:
# Check if the agent has reasoning output from the initial reasoning
if hasattr(self.agent, '_last_reasoning_output') and self.agent._last_reasoning_output:
if (
hasattr(self.agent, "_last_reasoning_output")
and self.agent._last_reasoning_output
):
reasoning_output = self.agent._last_reasoning_output
# Extract structured plan if available
if reasoning_output.plan.structured_plan:
self.agent_state.set_original_plan(reasoning_output.plan.structured_plan.steps)
self.agent_state.acceptance_criteria = reasoning_output.plan.structured_plan.acceptance_criteria
self.agent_state.set_original_plan(
reasoning_output.plan.structured_plan.steps
)
self.agent_state.acceptance_criteria = (
reasoning_output.plan.structured_plan.acceptance_criteria
)
elif reasoning_output.plan.plan:
# Fallback: try to extract steps from unstructured plan
plan_lines = [line.strip() for line in reasoning_output.plan.plan.split('\n') if line.strip()]
plan_lines = [
line.strip()
for line in reasoning_output.plan.plan.split("\n")
if line.strip()
]
# Take meaningful lines that look like steps (skip headers, empty lines, etc.)
steps = []
for line in plan_lines:
if line and not line.startswith('###') and not line.startswith('**'):
if (
line
and not line.startswith("###")
and not line.startswith("**")
):
steps.append(line)
if len(steps) >= 10: # Limit to 10 steps
break
@@ -240,33 +258,66 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
# Record detailed tool usage in agent state
if hasattr(formatted_answer, 'tool') and formatted_answer.tool:
if hasattr(formatted_answer, "tool") and formatted_answer.tool:
# Extract tool arguments from the agent action
tool_args = {}
if hasattr(formatted_answer, 'tool_input') and formatted_answer.tool_input:
if (
hasattr(formatted_answer, "tool_input")
and formatted_answer.tool_input
):
if isinstance(formatted_answer.tool_input, dict):
tool_args = formatted_answer.tool_input
elif isinstance(formatted_answer.tool_input, str):
# Try to parse JSON if it's a string
try:
import json
tool_args = json.loads(formatted_answer.tool_input)
except (json.JSONDecodeError, TypeError):
tool_args = {"input": formatted_answer.tool_input}
# Truncate result for summary
result_summary = None
if tool_result and hasattr(tool_result, 'result'):
if tool_result and hasattr(tool_result, "result"):
result_str = str(tool_result.result)
result_summary = result_str[:200] + "..." if len(result_str) > 200 else result_str
result_summary = (
result_str[:200] + "..."
if len(result_str) > 200
else result_str
)
# Record the tool usage with arguments
self.agent_state.record_tool_usage(
tool_name=formatted_answer.tool,
arguments=tool_args,
result_summary=result_summary
result_summary=result_summary,
)
# Extract relevant information to scratchpad if reasoning is enabled
# Note: This only happens for actual tool executions (AgentAction with tool),
# not for reasoning steps or other agent outputs
if (
hasattr(self.agent, "reasoning")
and self.agent.reasoning
and tool_result
):
self._extract_tool_result_to_scratchpad(
tool_name=formatted_answer.tool,
tool_args=tool_args,
tool_result=tool_result,
)
# Always print agent state (temporary debug)
print(
f"Agent State:\n"
f"Raw: {self.agent_state.model_dump_json()}\n"
f"[AGENT STATE] Step {self.agent_state.steps_completed}:\n"
f"Tool: {formatted_answer.tool}\n"
f"Scratchpad: {self.agent_state.scratchpad}\n"
f"Tool History: {len(self.agent_state.tool_usage_history)} entries\n"
f"Full State Context:\n{self.agent_state.to_context_string()}"
)
# Increment steps in agent state
self.agent_state.increment_steps()
@@ -520,7 +571,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if not hasattr(self.agent, "reasoning") or not self.agent.reasoning:
return False
if hasattr(self.agent, "reasoning_interval") and self.agent.reasoning_interval is not None:
if (
hasattr(self.agent, "reasoning_interval")
and self.agent.reasoning_interval is not None
):
return self.steps_since_reasoning >= self.agent.reasoning_interval
if hasattr(self.agent, "adaptive_reasoning") and self.agent.adaptive_reasoning:
@@ -538,7 +592,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"""
if self._has_recent_errors():
try:
from crewai.utilities.events.reasoning_events import AgentAdaptiveReasoningDecisionEvent
from crewai.utilities.events.reasoning_events import (
AgentAdaptiveReasoningDecisionEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
crewai_event_bus.emit(
@@ -565,7 +621,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
for usage in self.agent_state.tool_usage_history:
tool_desc = f"{usage.tool_name}"
if usage.arguments:
args_preview = ", ".join(f"{k}={v}" for k, v in list(usage.arguments.items())[:2])
args_preview = ", ".join(
f"{k}={v}" for k, v in list(usage.arguments.items())[:2]
)
tool_desc += f"({args_preview})"
tools_used_detailed.append(tool_desc)
@@ -575,15 +633,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
# Detect patterns in tool usage
tool_patterns = self._detect_tool_patterns()
if tool_patterns:
tool_stats['recent_patterns'] = tool_patterns
tool_stats["recent_patterns"] = tool_patterns
reasoning_handler = AgentReasoning(task=self.task, agent=cast(Agent, self.agent))
reasoning_handler = AgentReasoning(
task=self.task, agent=cast(Agent, self.agent)
)
return reasoning_handler.should_adaptive_reason_llm(
current_steps=self.iterations,
tools_used=tools_used_detailed,
current_progress=current_progress,
tool_usage_stats=tool_stats
tool_usage_stats=tool_stats,
)
except Exception as e:
self._printer.print(
@@ -605,32 +665,47 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
patterns = []
# Check for repeated use of the same tool with similar arguments
recent_tools = self.agent_state.tool_usage_history[-5:] if len(self.agent_state.tool_usage_history) >= 5 else self.agent_state.tool_usage_history
recent_tools = (
self.agent_state.tool_usage_history[-5:]
if len(self.agent_state.tool_usage_history) >= 5
else self.agent_state.tool_usage_history
)
# Count consecutive uses of the same tool
if len(recent_tools) >= 2:
consecutive_count = 1
for i in range(1, len(recent_tools)):
if recent_tools[i].tool_name == recent_tools[i-1].tool_name:
if recent_tools[i].tool_name == recent_tools[i - 1].tool_name:
consecutive_count += 1
if consecutive_count >= 3:
patterns.append(f"Same tool ({recent_tools[i].tool_name}) used {consecutive_count} times consecutively")
patterns.append(
f"Same tool ({recent_tools[i].tool_name}) used {consecutive_count} times consecutively"
)
else:
consecutive_count = 1
# Check for tools with empty or error results
error_count = 0
for usage in recent_tools:
if usage.result_summary and any(keyword in usage.result_summary.lower()
for keyword in ['error', 'failed', 'not found', 'empty']):
if usage.result_summary and any(
keyword in usage.result_summary.lower()
for keyword in ["error", "failed", "not found", "empty"]
):
error_count += 1
if error_count >= 2:
patterns.append(f"{error_count} tools returned errors or empty results recently")
patterns.append(
f"{error_count} tools returned errors or empty results recently"
)
# Check for rapid tool switching (might indicate confusion)
if len(set(usage.tool_name for usage in recent_tools)) == len(recent_tools) and len(recent_tools) >= 4:
patterns.append("Rapid switching between different tools without repetition")
if (
len(set(usage.tool_name for usage in recent_tools)) == len(recent_tools)
and len(recent_tools) >= 4
):
patterns.append(
"Rapid switching between different tools without repetition"
)
return "; ".join(patterns) if patterns else None
@@ -652,14 +727,18 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from crewai.agent import Agent
reasoning_handler = AgentReasoning(task=self.task, agent=cast(Agent, self.agent))
reasoning_handler = AgentReasoning(
task=self.task, agent=cast(Agent, self.agent)
)
# Build detailed tools used list from agent state
tools_used_detailed = []
for usage in self.agent_state.tool_usage_history:
tool_desc = f"{usage.tool_name}"
if usage.arguments:
args_preview = ", ".join(f"{k}={v}" for k, v in list(usage.arguments.items())[:2])
args_preview = ", ".join(
f"{k}={v}" for k, v in list(usage.arguments.items())[:2]
)
tool_desc += f"({args_preview})"
tools_used_detailed.append(tool_desc)
@@ -667,31 +746,28 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
current_steps=self.iterations,
tools_used=tools_used_detailed,
current_progress=current_progress,
iteration_messages=self.messages
iteration_messages=self.messages,
)
# Update agent state with new plan if available
if reasoning_output.plan.structured_plan:
self.agent_state.update_last_plan(reasoning_output.plan.structured_plan.steps)
self.agent_state.update_last_plan(
reasoning_output.plan.structured_plan.steps
)
# Update acceptance criteria if they changed
if reasoning_output.plan.structured_plan.acceptance_criteria:
self.agent_state.acceptance_criteria = reasoning_output.plan.structured_plan.acceptance_criteria
self.agent_state.acceptance_criteria = (
reasoning_output.plan.structured_plan.acceptance_criteria
)
# Add a note about the reasoning update to scratchpad
self.agent_state.add_to_scratchpad(
f"reasoning_update_{self.iterations}",
{
"reason": "Mid-execution reasoning triggered",
"updated_plan": bool(reasoning_output.plan.structured_plan)
}
)
# Don't add reasoning metadata to scratchpad - keep it exclusively for tool results
updated_plan_msg = (
self._i18n.retrieve("reasoning", "mid_execution_reasoning_update").format(
plan=reasoning_output.plan.plan
) +
f"\n\nUpdated State:\n{self.agent_state.to_context_string()}" +
"\n\nRemember: strictly follow the updated plan above and ensure the final answer fully meets the EXPECTED OUTPUT criteria."
self._i18n.retrieve(
"reasoning", "mid_execution_reasoning_update"
).format(plan=reasoning_output.plan.plan)
+ f"\n\nUpdated State:\n{self.agent_state.to_context_string()}"
+ "\n\nRemember: strictly follow the updated plan above and ensure the final answer fully meets the EXPECTED OUTPUT criteria."
)
self._append_message(updated_plan_msg, role="assistant")
@@ -711,7 +787,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
str: A summary of the current progress.
"""
recent_messages = self.messages[-5:] if len(self.messages) >= 5 else self.messages
recent_messages = (
self.messages[-5:] if len(self.messages) >= 5 else self.messages
)
summary = f"After {self.iterations} steps, "
@@ -721,8 +799,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
summary += f"I've used {tool_summary['total_tool_uses']} tools ({tool_summary['unique_tools']} unique). "
# Include most frequently used tools
if tool_summary['tools_by_frequency']:
top_tools = list(tool_summary['tools_by_frequency'].items())[:3]
if tool_summary["tools_by_frequency"]:
top_tools = list(tool_summary["tools_by_frequency"].items())[:3]
tools_str = ", ".join(f"{tool} ({count}x)" for tool, count in top_tools)
summary += f"Most used: {tools_str}. "
@@ -731,7 +809,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
last_tool = self.agent_state.tool_usage_history[-1]
summary += f"Last tool: {last_tool.tool_name}"
if last_tool.arguments:
args_str = ", ".join(f"{k}={v}" for k, v in list(last_tool.arguments.items())[:2])
args_str = ", ".join(
f"{k}={v}" for k, v in list(last_tool.arguments.items())[:2]
)
summary += f" with args ({args_str})"
summary += ". "
else:
@@ -748,10 +828,206 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _has_recent_errors(self) -> bool:
"""Check for error indicators in recent messages."""
error_indicators = ["error", "exception", "failed", "unable to", "couldn't"]
recent_messages = self.messages[-3:] if len(self.messages) >= 3 else self.messages
recent_messages = (
self.messages[-3:] if len(self.messages) >= 3 else self.messages
)
for message in recent_messages:
content = message.get("content", "").lower()
if any(indicator in content for indicator in error_indicators):
return True
return False
def _extract_tool_result_to_scratchpad(
self, tool_name: str, tool_args: Dict[str, Any], tool_result: ToolResult
) -> None:
"""Extract relevant information from tool result using LLM and add to scratchpad.
This method uses the agent's LLM to intelligently extract and summarize
important information from tool results, storing it in the agent's scratchpad
for future reference during task execution.
Args:
tool_name: Name of the tool that was executed
tool_args: Arguments that were passed to the tool
tool_result: The result returned by the tool
"""
try:
# Create a prompt for the LLM to extract relevant information
extraction_prompt = f"""Given the following tool execution result, extract and summarize the most relevant information that would be useful for completing the current task.
Tool Name: {tool_name}
Tool Arguments: {json.dumps(tool_args, indent=2) if tool_args else "None"}
Tool Result: {tool_result.result}
Current Task Context:
- Task Description: {self.task.description if self.task else "Not specified"}
- Expected Output: {self.task.expected_output if self.task else "Not specified"}
- Steps Completed: {self.agent_state.steps_completed}
Instructions:
1. Identify the KEY INFORMATION from the tool result that directly relates to the task
2. Extract any important data points, facts, or findings
3. Note any errors, warnings, or issues that might affect task completion
4. Summarize in a concise format (max 3-5 bullet points)
5. Focus on information that will be useful for subsequent steps
6. Generate a descriptive key name that explains what data is being stored (e.g., "email_and_thread_ids", "search_results", "file_contents", etc.)
Respond in the following JSON format:
{{
"suggested_key_name": "descriptive_name_for_this_data",
"key_findings": ["finding1", "finding2", ...],
"data_points": {{"key": "value", ...}} or [list of items],
"issues": ["issue1", "issue2", ...] or null if none,
"relevance_score": 1-10 (how relevant this result is to the task)
}}"""
# Create messages for LLM call
messages = [format_message_for_llm(extraction_prompt, role="user")]
# Call LLM to extract information
try:
extraction_response = get_llm_response(
llm=self.llm,
messages=messages,
callbacks=self.callbacks,
printer=self._printer,
)
# Try to parse the JSON response directly
try:
extracted_info = json.loads(extraction_response)
except json.JSONDecodeError:
# If direct parsing fails, try to extract JSON from the response
# The LLM might have wrapped it in markdown code blocks or added extra text
json_match = None
# Try to find JSON in markdown code blocks
import re
json_pattern = r"```(?:json)?\s*(\{.*?\})\s*```"
matches = re.findall(json_pattern, extraction_response, re.DOTALL)
if matches:
# Try to parse the first match
for match in matches:
try:
json_match = json.loads(match)
break
except json.JSONDecodeError:
continue
# If no markdown JSON found, try to find raw JSON object
if not json_match:
# Look for JSON object in the response
json_start = extraction_response.find("{")
json_end = extraction_response.rfind("}")
if (
json_start != -1
and json_end != -1
and json_end > json_start
):
try:
potential_json = extraction_response[
json_start : json_end + 1
]
json_match = json.loads(potential_json)
except json.JSONDecodeError:
pass
if json_match:
extracted_info = json_match
else:
# Couldn't parse JSON, raise to trigger fallback
raise json.JSONDecodeError(
"Could not extract JSON", extraction_response, 0
)
# Process the extracted info
# Use the suggested key name or fall back to default
suggested_key = extracted_info.get("suggested_key_name", "")
if suggested_key and suggested_key.replace("_", "").isalnum():
scratchpad_key = suggested_key
else:
# Generate a meaningful key from tool name
scratchpad_key = tool_name.replace("_", "")
# Get the data points
data_points = extracted_info.get("data_points", {})
# Simplify the data structure based on what's extracted
if isinstance(data_points, list):
# If data_points is already a list, store it directly
data_to_store = data_points
elif isinstance(data_points, dict) and len(data_points) == 1:
# If it's a dict with a single key containing a list, extract the list
single_key = list(data_points.keys())[0]
if isinstance(data_points[single_key], list):
data_to_store = data_points[single_key]
else:
data_to_store = data_points
else:
data_to_store = data_points
# Store based on relevance score
relevance_score = extracted_info.get("relevance_score", 0)
if relevance_score >= 7:
# For high relevance, store just the data
self.agent_state.add_to_scratchpad(scratchpad_key, data_to_store)
else:
# For lower relevance, include more context
self.agent_state.add_to_scratchpad(
scratchpad_key,
{
"data": data_to_store,
"tool": tool_name,
"findings": extracted_info.get("key_findings", []),
"relevance": relevance_score,
},
)
# Also store key findings if present and relevance is high
if relevance_score >= 7 and extracted_info.get("key_findings"):
current_findings = self.agent_state.scratchpad.get(
"key_findings", []
)
current_findings.extend(extracted_info["key_findings"])
self.agent_state.add_to_scratchpad(
"key_findings", current_findings[-10:]
)
except (json.JSONDecodeError, KeyError, TypeError):
# Fallback for when we can't extract structured data
# Try to generate a meaningful key name from tool name
scratchpad_key = tool_name.replace("_", "")
# Store a preview of the result
self.agent_state.add_to_scratchpad(
scratchpad_key,
{
"raw_response": extraction_response[:500] + "..."
if len(extraction_response) > 500
else extraction_response,
"tool_result_preview": str(tool_result.result)[:300] + "..."
if len(str(tool_result.result)) > 300
else str(tool_result.result),
},
)
except Exception as e:
# Log error but don't fail the entire execution
self._printer.print(
content=f"Failed to extract tool result to scratchpad: {str(e)}",
color="yellow",
)
# Still store basic information even if extraction fails
fallback_key = f"{tool_name}_raw_{self.agent_state.steps_completed}"
self.agent_state.add_to_scratchpad(
fallback_key,
{
"error": f"Extraction failed: {str(e)}",
"raw_preview": str(tool_result.result)[:200] + "..."
if len(str(tool_result.result)) > 200
else str(tool_result.result),
},
)