From 4fcabd391f7ea9311d003c5b1fd8f1a4aa6d2512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Moura?= Date: Sun, 1 Jun 2025 01:27:26 -0700 Subject: [PATCH] populating state --- src/crewai/agents/crew_agent_executor.py | 372 ++++++++++++++++++++--- 1 file changed, 324 insertions(+), 48 deletions(-) diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 0a99708f5..16aedc6fe 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -1,4 +1,5 @@ from typing import Any, Callable, Dict, List, Optional, Union, cast +import json from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin @@ -85,7 +86,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): tool.name: tool for tool in self.tools } self.steps_since_reasoning = 0 - self.agent_state: AgentState = AgentState(task_id=str(task.id) if task else None) + self.agent_state: AgentState = AgentState( + task_id=str(task.id) if task else None + ) existing_stop = self.llm.stop or [] self.llm.stop = list( set( @@ -147,20 +150,35 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """Populate agent state from the reasoning output if available.""" try: # Check if the agent has reasoning output from the initial reasoning - if hasattr(self.agent, '_last_reasoning_output') and self.agent._last_reasoning_output: + if ( + hasattr(self.agent, "_last_reasoning_output") + and self.agent._last_reasoning_output + ): reasoning_output = self.agent._last_reasoning_output # Extract structured plan if available if reasoning_output.plan.structured_plan: - self.agent_state.set_original_plan(reasoning_output.plan.structured_plan.steps) - self.agent_state.acceptance_criteria = reasoning_output.plan.structured_plan.acceptance_criteria + self.agent_state.set_original_plan( + reasoning_output.plan.structured_plan.steps + ) + self.agent_state.acceptance_criteria = ( + reasoning_output.plan.structured_plan.acceptance_criteria + ) elif reasoning_output.plan.plan: # Fallback: try to extract steps from unstructured plan - plan_lines = [line.strip() for line in reasoning_output.plan.plan.split('\n') if line.strip()] + plan_lines = [ + line.strip() + for line in reasoning_output.plan.plan.split("\n") + if line.strip() + ] # Take meaningful lines that look like steps (skip headers, empty lines, etc.) steps = [] for line in plan_lines: - if line and not line.startswith('###') and not line.startswith('**'): + if ( + line + and not line.startswith("###") + and not line.startswith("**") + ): steps.append(line) if len(steps) >= 10: # Limit to 10 steps break @@ -240,33 +258,66 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ) # Record detailed tool usage in agent state - if hasattr(formatted_answer, 'tool') and formatted_answer.tool: + if hasattr(formatted_answer, "tool") and formatted_answer.tool: # Extract tool arguments from the agent action tool_args = {} - if hasattr(formatted_answer, 'tool_input') and formatted_answer.tool_input: + if ( + hasattr(formatted_answer, "tool_input") + and formatted_answer.tool_input + ): if isinstance(formatted_answer.tool_input, dict): tool_args = formatted_answer.tool_input elif isinstance(formatted_answer.tool_input, str): # Try to parse JSON if it's a string try: import json + tool_args = json.loads(formatted_answer.tool_input) except (json.JSONDecodeError, TypeError): tool_args = {"input": formatted_answer.tool_input} # Truncate result for summary result_summary = None - if tool_result and hasattr(tool_result, 'result'): + if tool_result and hasattr(tool_result, "result"): result_str = str(tool_result.result) - result_summary = result_str[:200] + "..." if len(result_str) > 200 else result_str + result_summary = ( + result_str[:200] + "..." + if len(result_str) > 200 + else result_str + ) # Record the tool usage with arguments self.agent_state.record_tool_usage( tool_name=formatted_answer.tool, arguments=tool_args, - result_summary=result_summary + result_summary=result_summary, ) + # Extract relevant information to scratchpad if reasoning is enabled + # Note: This only happens for actual tool executions (AgentAction with tool), + # not for reasoning steps or other agent outputs + if ( + hasattr(self.agent, "reasoning") + and self.agent.reasoning + and tool_result + ): + self._extract_tool_result_to_scratchpad( + tool_name=formatted_answer.tool, + tool_args=tool_args, + tool_result=tool_result, + ) + + # Always print agent state (temporary debug) + print( + f"Agent State:\n" + f"Raw: {self.agent_state.model_dump_json()}\n" + f"[AGENT STATE] Step {self.agent_state.steps_completed}:\n" + f"Tool: {formatted_answer.tool}\n" + f"Scratchpad: {self.agent_state.scratchpad}\n" + f"Tool History: {len(self.agent_state.tool_usage_history)} entries\n" + f"Full State Context:\n{self.agent_state.to_context_string()}" + ) + # Increment steps in agent state self.agent_state.increment_steps() @@ -520,7 +571,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if not hasattr(self.agent, "reasoning") or not self.agent.reasoning: return False - if hasattr(self.agent, "reasoning_interval") and self.agent.reasoning_interval is not None: + if ( + hasattr(self.agent, "reasoning_interval") + and self.agent.reasoning_interval is not None + ): return self.steps_since_reasoning >= self.agent.reasoning_interval if hasattr(self.agent, "adaptive_reasoning") and self.agent.adaptive_reasoning: @@ -538,7 +592,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """ if self._has_recent_errors(): try: - from crewai.utilities.events.reasoning_events import AgentAdaptiveReasoningDecisionEvent + from crewai.utilities.events.reasoning_events import ( + AgentAdaptiveReasoningDecisionEvent, + ) from crewai.utilities.events.crewai_event_bus import crewai_event_bus crewai_event_bus.emit( @@ -565,7 +621,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): for usage in self.agent_state.tool_usage_history: tool_desc = f"{usage.tool_name}" if usage.arguments: - args_preview = ", ".join(f"{k}={v}" for k, v in list(usage.arguments.items())[:2]) + args_preview = ", ".join( + f"{k}={v}" for k, v in list(usage.arguments.items())[:2] + ) tool_desc += f"({args_preview})" tools_used_detailed.append(tool_desc) @@ -575,15 +633,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): # Detect patterns in tool usage tool_patterns = self._detect_tool_patterns() if tool_patterns: - tool_stats['recent_patterns'] = tool_patterns + tool_stats["recent_patterns"] = tool_patterns - reasoning_handler = AgentReasoning(task=self.task, agent=cast(Agent, self.agent)) + reasoning_handler = AgentReasoning( + task=self.task, agent=cast(Agent, self.agent) + ) return reasoning_handler.should_adaptive_reason_llm( current_steps=self.iterations, tools_used=tools_used_detailed, current_progress=current_progress, - tool_usage_stats=tool_stats + tool_usage_stats=tool_stats, ) except Exception as e: self._printer.print( @@ -605,32 +665,47 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): patterns = [] # Check for repeated use of the same tool with similar arguments - recent_tools = self.agent_state.tool_usage_history[-5:] if len(self.agent_state.tool_usage_history) >= 5 else self.agent_state.tool_usage_history + recent_tools = ( + self.agent_state.tool_usage_history[-5:] + if len(self.agent_state.tool_usage_history) >= 5 + else self.agent_state.tool_usage_history + ) # Count consecutive uses of the same tool if len(recent_tools) >= 2: consecutive_count = 1 for i in range(1, len(recent_tools)): - if recent_tools[i].tool_name == recent_tools[i-1].tool_name: + if recent_tools[i].tool_name == recent_tools[i - 1].tool_name: consecutive_count += 1 if consecutive_count >= 3: - patterns.append(f"Same tool ({recent_tools[i].tool_name}) used {consecutive_count} times consecutively") + patterns.append( + f"Same tool ({recent_tools[i].tool_name}) used {consecutive_count} times consecutively" + ) else: consecutive_count = 1 # Check for tools with empty or error results error_count = 0 for usage in recent_tools: - if usage.result_summary and any(keyword in usage.result_summary.lower() - for keyword in ['error', 'failed', 'not found', 'empty']): + if usage.result_summary and any( + keyword in usage.result_summary.lower() + for keyword in ["error", "failed", "not found", "empty"] + ): error_count += 1 if error_count >= 2: - patterns.append(f"{error_count} tools returned errors or empty results recently") + patterns.append( + f"{error_count} tools returned errors or empty results recently" + ) # Check for rapid tool switching (might indicate confusion) - if len(set(usage.tool_name for usage in recent_tools)) == len(recent_tools) and len(recent_tools) >= 4: - patterns.append("Rapid switching between different tools without repetition") + if ( + len(set(usage.tool_name for usage in recent_tools)) == len(recent_tools) + and len(recent_tools) >= 4 + ): + patterns.append( + "Rapid switching between different tools without repetition" + ) return "; ".join(patterns) if patterns else None @@ -652,14 +727,18 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): from crewai.agent import Agent - reasoning_handler = AgentReasoning(task=self.task, agent=cast(Agent, self.agent)) + reasoning_handler = AgentReasoning( + task=self.task, agent=cast(Agent, self.agent) + ) # Build detailed tools used list from agent state tools_used_detailed = [] for usage in self.agent_state.tool_usage_history: tool_desc = f"{usage.tool_name}" if usage.arguments: - args_preview = ", ".join(f"{k}={v}" for k, v in list(usage.arguments.items())[:2]) + args_preview = ", ".join( + f"{k}={v}" for k, v in list(usage.arguments.items())[:2] + ) tool_desc += f"({args_preview})" tools_used_detailed.append(tool_desc) @@ -667,31 +746,28 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): current_steps=self.iterations, tools_used=tools_used_detailed, current_progress=current_progress, - iteration_messages=self.messages + iteration_messages=self.messages, ) # Update agent state with new plan if available if reasoning_output.plan.structured_plan: - self.agent_state.update_last_plan(reasoning_output.plan.structured_plan.steps) + self.agent_state.update_last_plan( + reasoning_output.plan.structured_plan.steps + ) # Update acceptance criteria if they changed if reasoning_output.plan.structured_plan.acceptance_criteria: - self.agent_state.acceptance_criteria = reasoning_output.plan.structured_plan.acceptance_criteria + self.agent_state.acceptance_criteria = ( + reasoning_output.plan.structured_plan.acceptance_criteria + ) - # Add a note about the reasoning update to scratchpad - self.agent_state.add_to_scratchpad( - f"reasoning_update_{self.iterations}", - { - "reason": "Mid-execution reasoning triggered", - "updated_plan": bool(reasoning_output.plan.structured_plan) - } - ) + # Don't add reasoning metadata to scratchpad - keep it exclusively for tool results updated_plan_msg = ( - self._i18n.retrieve("reasoning", "mid_execution_reasoning_update").format( - plan=reasoning_output.plan.plan - ) + - f"\n\nUpdated State:\n{self.agent_state.to_context_string()}" + - "\n\nRemember: strictly follow the updated plan above and ensure the final answer fully meets the EXPECTED OUTPUT criteria." + self._i18n.retrieve( + "reasoning", "mid_execution_reasoning_update" + ).format(plan=reasoning_output.plan.plan) + + f"\n\nUpdated State:\n{self.agent_state.to_context_string()}" + + "\n\nRemember: strictly follow the updated plan above and ensure the final answer fully meets the EXPECTED OUTPUT criteria." ) self._append_message(updated_plan_msg, role="assistant") @@ -711,7 +787,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: str: A summary of the current progress. """ - recent_messages = self.messages[-5:] if len(self.messages) >= 5 else self.messages + recent_messages = ( + self.messages[-5:] if len(self.messages) >= 5 else self.messages + ) summary = f"After {self.iterations} steps, " @@ -721,8 +799,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): summary += f"I've used {tool_summary['total_tool_uses']} tools ({tool_summary['unique_tools']} unique). " # Include most frequently used tools - if tool_summary['tools_by_frequency']: - top_tools = list(tool_summary['tools_by_frequency'].items())[:3] + if tool_summary["tools_by_frequency"]: + top_tools = list(tool_summary["tools_by_frequency"].items())[:3] tools_str = ", ".join(f"{tool} ({count}x)" for tool, count in top_tools) summary += f"Most used: {tools_str}. " @@ -731,7 +809,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): last_tool = self.agent_state.tool_usage_history[-1] summary += f"Last tool: {last_tool.tool_name}" if last_tool.arguments: - args_str = ", ".join(f"{k}={v}" for k, v in list(last_tool.arguments.items())[:2]) + args_str = ", ".join( + f"{k}={v}" for k, v in list(last_tool.arguments.items())[:2] + ) summary += f" with args ({args_str})" summary += ". " else: @@ -748,10 +828,206 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def _has_recent_errors(self) -> bool: """Check for error indicators in recent messages.""" error_indicators = ["error", "exception", "failed", "unable to", "couldn't"] - recent_messages = self.messages[-3:] if len(self.messages) >= 3 else self.messages + recent_messages = ( + self.messages[-3:] if len(self.messages) >= 3 else self.messages + ) for message in recent_messages: content = message.get("content", "").lower() if any(indicator in content for indicator in error_indicators): return True return False + + def _extract_tool_result_to_scratchpad( + self, tool_name: str, tool_args: Dict[str, Any], tool_result: ToolResult + ) -> None: + """Extract relevant information from tool result using LLM and add to scratchpad. + + This method uses the agent's LLM to intelligently extract and summarize + important information from tool results, storing it in the agent's scratchpad + for future reference during task execution. + + Args: + tool_name: Name of the tool that was executed + tool_args: Arguments that were passed to the tool + tool_result: The result returned by the tool + """ + try: + # Create a prompt for the LLM to extract relevant information + extraction_prompt = f"""Given the following tool execution result, extract and summarize the most relevant information that would be useful for completing the current task. + +Tool Name: {tool_name} +Tool Arguments: {json.dumps(tool_args, indent=2) if tool_args else "None"} +Tool Result: {tool_result.result} + +Current Task Context: +- Task Description: {self.task.description if self.task else "Not specified"} +- Expected Output: {self.task.expected_output if self.task else "Not specified"} +- Steps Completed: {self.agent_state.steps_completed} + +Instructions: +1. Identify the KEY INFORMATION from the tool result that directly relates to the task +2. Extract any important data points, facts, or findings +3. Note any errors, warnings, or issues that might affect task completion +4. Summarize in a concise format (max 3-5 bullet points) +5. Focus on information that will be useful for subsequent steps +6. Generate a descriptive key name that explains what data is being stored (e.g., "email_and_thread_ids", "search_results", "file_contents", etc.) + +Respond in the following JSON format: +{{ + "suggested_key_name": "descriptive_name_for_this_data", + "key_findings": ["finding1", "finding2", ...], + "data_points": {{"key": "value", ...}} or [list of items], + "issues": ["issue1", "issue2", ...] or null if none, + "relevance_score": 1-10 (how relevant this result is to the task) +}}""" + + # Create messages for LLM call + messages = [format_message_for_llm(extraction_prompt, role="user")] + + # Call LLM to extract information + try: + extraction_response = get_llm_response( + llm=self.llm, + messages=messages, + callbacks=self.callbacks, + printer=self._printer, + ) + + # Try to parse the JSON response directly + try: + extracted_info = json.loads(extraction_response) + except json.JSONDecodeError: + # If direct parsing fails, try to extract JSON from the response + # The LLM might have wrapped it in markdown code blocks or added extra text + json_match = None + + # Try to find JSON in markdown code blocks + import re + + json_pattern = r"```(?:json)?\s*(\{.*?\})\s*```" + matches = re.findall(json_pattern, extraction_response, re.DOTALL) + + if matches: + # Try to parse the first match + for match in matches: + try: + json_match = json.loads(match) + break + except json.JSONDecodeError: + continue + + # If no markdown JSON found, try to find raw JSON object + if not json_match: + # Look for JSON object in the response + json_start = extraction_response.find("{") + json_end = extraction_response.rfind("}") + if ( + json_start != -1 + and json_end != -1 + and json_end > json_start + ): + try: + potential_json = extraction_response[ + json_start : json_end + 1 + ] + json_match = json.loads(potential_json) + except json.JSONDecodeError: + pass + + if json_match: + extracted_info = json_match + else: + # Couldn't parse JSON, raise to trigger fallback + raise json.JSONDecodeError( + "Could not extract JSON", extraction_response, 0 + ) + + # Process the extracted info + # Use the suggested key name or fall back to default + suggested_key = extracted_info.get("suggested_key_name", "") + if suggested_key and suggested_key.replace("_", "").isalnum(): + scratchpad_key = suggested_key + else: + # Generate a meaningful key from tool name + scratchpad_key = tool_name.replace("_", "") + + # Get the data points + data_points = extracted_info.get("data_points", {}) + + # Simplify the data structure based on what's extracted + if isinstance(data_points, list): + # If data_points is already a list, store it directly + data_to_store = data_points + elif isinstance(data_points, dict) and len(data_points) == 1: + # If it's a dict with a single key containing a list, extract the list + single_key = list(data_points.keys())[0] + if isinstance(data_points[single_key], list): + data_to_store = data_points[single_key] + else: + data_to_store = data_points + else: + data_to_store = data_points + + # Store based on relevance score + relevance_score = extracted_info.get("relevance_score", 0) + if relevance_score >= 7: + # For high relevance, store just the data + self.agent_state.add_to_scratchpad(scratchpad_key, data_to_store) + else: + # For lower relevance, include more context + self.agent_state.add_to_scratchpad( + scratchpad_key, + { + "data": data_to_store, + "tool": tool_name, + "findings": extracted_info.get("key_findings", []), + "relevance": relevance_score, + }, + ) + + # Also store key findings if present and relevance is high + if relevance_score >= 7 and extracted_info.get("key_findings"): + current_findings = self.agent_state.scratchpad.get( + "key_findings", [] + ) + current_findings.extend(extracted_info["key_findings"]) + self.agent_state.add_to_scratchpad( + "key_findings", current_findings[-10:] + ) + + except (json.JSONDecodeError, KeyError, TypeError): + # Fallback for when we can't extract structured data + # Try to generate a meaningful key name from tool name + scratchpad_key = tool_name.replace("_", "") + + # Store a preview of the result + self.agent_state.add_to_scratchpad( + scratchpad_key, + { + "raw_response": extraction_response[:500] + "..." + if len(extraction_response) > 500 + else extraction_response, + "tool_result_preview": str(tool_result.result)[:300] + "..." + if len(str(tool_result.result)) > 300 + else str(tool_result.result), + }, + ) + + except Exception as e: + # Log error but don't fail the entire execution + self._printer.print( + content=f"Failed to extract tool result to scratchpad: {str(e)}", + color="yellow", + ) + # Still store basic information even if extraction fails + fallback_key = f"{tool_name}_raw_{self.agent_state.steps_completed}" + self.agent_state.add_to_scratchpad( + fallback_key, + { + "error": f"Extraction failed: {str(e)}", + "raw_preview": str(tool_result.result)[:200] + "..." + if len(str(tool_result.result)) > 200 + else str(tool_result.result), + }, + )