diff --git a/src/crewai/agent.py b/src/crewai/agent.py index 651ace43c..b367c0ddd 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -2,7 +2,7 @@ import shutil import subprocess from typing import Any, Dict, List, Literal, Optional, Sequence, Type, Union -from pydantic import Field, InstanceOf, PrivateAttr, model_validator +from pydantic import Field, InstanceOf, PrivateAttr, field_validator, model_validator from crewai.agents import CacheHandler from crewai.agents.agent_builder.base_agent import BaseAgent @@ -139,6 +139,13 @@ class Agent(BaseAgent): default=None, description="Interval of steps after which the agent should reason again during execution. If None, reasoning only happens before execution.", ) + + @field_validator('reasoning_interval') + @classmethod + def validate_reasoning_interval(cls, v): + if v is not None and v < 1: + raise ValueError("reasoning_interval must be >= 1") + return v adaptive_reasoning: bool = Field( default=False, description="Whether the agent should adaptively decide when to reason during execution based on context.", diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 63b81f83e..5d1369ff2 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -1,3 +1,4 @@ +from collections import deque from typing import Any, Callable, Dict, List, Optional, Union from crewai.agents.agent_builder.base_agent import BaseAgent @@ -83,7 +84,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = { tool.name: tool for tool in self.tools } - self.tools_used: List[str] = [] + self.tools_used: deque = deque(maxlen=100) # Limit history size self.steps_since_reasoning = 0 existing_stop = self.llm.stop or [] self.llm.stop = list( @@ -192,7 +193,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if self._should_trigger_reasoning(): self._handle_mid_execution_reasoning() - + self._invoke_step_callback(formatted_answer) self._append_message(formatted_answer.text, role="assistant") @@ -240,7 +241,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if hasattr(formatted_answer, 'tool') and formatted_answer.tool: if formatted_answer.tool not in self.tools_used: self.tools_used.append(formatted_answer.tool) - + # Special case for add_image_tool add_image_tool = self._i18n.tools("add_image") if ( @@ -459,107 +460,117 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): ), color="red", ) - + def _should_trigger_reasoning(self) -> bool: """ Determine if mid-execution reasoning should be triggered. - + Returns: bool: True if reasoning should be triggered, False otherwise. """ if not hasattr(self.agent, "reasoning") or not self.agent.reasoning: return False - + self.steps_since_reasoning += 1 - + if hasattr(self.agent, "reasoning_interval") and self.agent.reasoning_interval: if self.steps_since_reasoning >= self.agent.reasoning_interval: return True - + if hasattr(self.agent, "adaptive_reasoning") and self.agent.adaptive_reasoning: return self._should_adaptive_reason() - + return False - + def _should_adaptive_reason(self) -> bool: """ Determine if adaptive reasoning should be triggered based on execution context. - + Returns: bool: True if adaptive reasoning should be triggered, False otherwise. """ - - if len(set(self.tools_used[-3:])) > 1 and len(self.tools_used) >= 3: - return True - - if self.iterations > self.max_iter // 2: - return True - + return ( + self._has_used_multiple_tools_recently() or + self._is_taking_too_long() or + self._has_recent_errors() + ) + + def _has_used_multiple_tools_recently(self) -> bool: + """Check if multiple different tools were used in recent steps.""" + if len(self.tools_used) < 3: + return False + return len(set(list(self.tools_used)[-3:])) > 1 + + def _is_taking_too_long(self) -> bool: + """Check if iterations exceed expected duration.""" + return self.iterations > self.max_iter // 2 + + def _has_recent_errors(self) -> bool: + """Check for error indicators in recent messages.""" error_indicators = ["error", "exception", "failed", "unable to", "couldn't"] recent_messages = self.messages[-3:] if len(self.messages) >= 3 else self.messages - + for message in recent_messages: content = message.get("content", "").lower() if any(indicator in content for indicator in error_indicators): return True - return False - + def _handle_mid_execution_reasoning(self) -> None: """ Handle mid-execution reasoning by calling the reasoning handler. """ if not hasattr(self.agent, "reasoning") or not self.agent.reasoning: return - + try: from crewai.utilities.reasoning_handler import AgentReasoning - + current_progress = self._summarize_current_progress() - + reasoning_handler = AgentReasoning(task=self.task, agent=self.agent) - + reasoning_output = reasoning_handler.handle_mid_execution_reasoning( current_steps=self.iterations, tools_used=self.tools_used, current_progress=current_progress, iteration_messages=self.messages ) - + self.messages.append({ "role": "system", "content": f"I've reassessed my approach based on progress so far. Updated plan:\n\n{reasoning_output.plan.plan}" }) - + self.steps_since_reasoning = 0 - + except Exception as e: self._printer.print( content=f"Error during mid-execution reasoning: {str(e)}", color="red", ) - + def _summarize_current_progress(self) -> str: """ Create a summary of the current execution progress. - + Returns: str: A summary of the current progress. """ recent_messages = self.messages[-5:] if len(self.messages) >= 5 else self.messages - + summary = f"After {self.iterations} steps, " - + if self.tools_used: unique_tools = set(self.tools_used) summary += f"I've used {len(self.tools_used)} tools ({', '.join(unique_tools)}). " else: summary += "I haven't used any tools yet. " - + if recent_messages: last_message = recent_messages[-1].get("content", "") if len(last_message) > 100: last_message = last_message[:100] + "..." summary += f"Most recent action: {last_message}" - + return summary diff --git a/src/crewai/utilities/reasoning_handler.py b/src/crewai/utilities/reasoning_handler.py index bff999ea8..5b4cc1430 100644 --- a/src/crewai/utilities/reasoning_handler.py +++ b/src/crewai/utilities/reasoning_handler.py @@ -386,6 +386,26 @@ class AgentReasoning: ) return self.handle_agent_reasoning() + def _emit_reasoning_event(self, event_class, **kwargs): + """Centralized method for emitting reasoning events.""" + try: + reasoning_trigger = "interval" + if hasattr(self.agent, 'adaptive_reasoning') and self.agent.adaptive_reasoning: + reasoning_trigger = "adaptive" + + crewai_event_bus.emit( + self.agent, + event_class( + agent_role=self.agent.role, + task_id=str(self.task.id), + reasoning_trigger=reasoning_trigger, + **kwargs + ), + ) + except Exception: + # Ignore event bus errors to avoid breaking execution + pass + def handle_mid_execution_reasoning( self, current_steps: int, @@ -405,69 +425,37 @@ class AgentReasoning: Returns: AgentReasoningOutput: Updated reasoning plan based on current context """ - try: - from crewai.utilities.events.reasoning_events import AgentMidExecutionReasoningStartedEvent - - reasoning_trigger = "interval" - if self.agent.adaptive_reasoning: - reasoning_trigger = "adaptive" - - crewai_event_bus.emit( - self.agent, - AgentMidExecutionReasoningStartedEvent( - agent_role=self.agent.role, - task_id=str(self.task.id), - current_step=current_steps, - reasoning_trigger=reasoning_trigger, - ), - ) - except Exception: - # Ignore event bus errors to avoid breaking execution - pass + from crewai.utilities.events.reasoning_events import AgentMidExecutionReasoningStartedEvent + + self._emit_reasoning_event( + AgentMidExecutionReasoningStartedEvent, + current_step=current_steps + ) try: output = self.__handle_mid_execution_reasoning( current_steps, tools_used, current_progress, iteration_messages ) - # Emit reasoning completed event - try: - from crewai.utilities.events.reasoning_events import AgentMidExecutionReasoningCompletedEvent - - reasoning_trigger = "interval" - if self.agent.adaptive_reasoning: - reasoning_trigger = "adaptive" - - crewai_event_bus.emit( - self.agent, - AgentMidExecutionReasoningCompletedEvent( - agent_role=self.agent.role, - task_id=str(self.task.id), - current_step=current_steps, - updated_plan=output.plan.plan, - reasoning_trigger=reasoning_trigger, - ), - ) - except Exception: - pass + # Emit completed event + from crewai.utilities.events.reasoning_events import AgentMidExecutionReasoningCompletedEvent + + self._emit_reasoning_event( + AgentMidExecutionReasoningCompletedEvent, + current_step=current_steps, + updated_plan=output.plan.plan + ) return output except Exception as e: - # Emit reasoning failed event - try: - from crewai.utilities.events.reasoning_events import AgentReasoningFailedEvent - - crewai_event_bus.emit( - self.agent, - AgentReasoningFailedEvent( - agent_role=self.agent.role, - task_id=str(self.task.id), - error=str(e), - attempt=1, - ), - ) - except Exception: - pass + # Emit failed event + from crewai.utilities.events.reasoning_events import AgentReasoningFailedEvent + + self._emit_reasoning_event( + AgentReasoningFailedEvent, + error=str(e), + attempt=1 + ) raise diff --git a/tests/reasoning_interval_test.py b/tests/reasoning_interval_test.py index dd25bc193..e354a12f9 100644 --- a/tests/reasoning_interval_test.py +++ b/tests/reasoning_interval_test.py @@ -1,6 +1,5 @@ """Tests for reasoning interval and adaptive reasoning in agents.""" -import json import pytest from unittest.mock import patch, MagicMock @@ -211,3 +210,35 @@ def test_should_trigger_adaptive_reasoning(): {"role": "assistant", "content": "Let me continue with the next step."} ] assert executor._should_adaptive_reason() is False + + +@pytest.mark.parametrize("interval,steps,should_reason", [ + (None, 5, False), + (3, 2, False), + (3, 3, True), + (1, 1, True), + (5, 10, True), +]) +def test_reasoning_interval_scenarios(interval, steps, should_reason): + """Test various reasoning interval scenarios.""" + agent = MagicMock() + agent.reasoning = True + agent.reasoning_interval = interval + agent.adaptive_reasoning = False + + executor = CrewAgentExecutor( + llm=MagicMock(), + task=MagicMock(), + crew=MagicMock(), + agent=agent, + prompt={}, + max_iter=10, + tools=[], + tools_names="", + stop_words=[], + tools_description="", + tools_handler=MagicMock() + ) + + executor.steps_since_reasoning = steps + assert executor._should_trigger_reasoning() is should_reason