Add reasoning_interval and adaptive_reasoning features

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-05-26 17:32:47 +00:00
parent 22db4aae81
commit 9a2ddb39ce
6 changed files with 595 additions and 1 deletions

View File

@@ -0,0 +1,56 @@
"""
Example demonstrating the new reasoning interval and adaptive reasoning features.
This example shows how to:
1. Use reasoning_interval to make an agent reason every X steps
2. Use adaptive_reasoning to let the agent decide when to reason
"""
from crewai import Agent, Task, Crew
from crewai.tools import WebBrowserTool
browser_tool = WebBrowserTool()
interval_agent = Agent(
role="Research Analyst",
goal="Find and analyze information about a specific topic",
backstory="You are a skilled researcher who methodically analyzes information.",
verbose=True,
reasoning=True,
reasoning_interval=3,
tools=[browser_tool]
)
adaptive_agent = Agent(
role="Research Analyst",
goal="Find and analyze information about a specific topic",
backstory="You are a skilled researcher who methodically analyzes information.",
verbose=True,
reasoning=True,
adaptive_reasoning=True,
tools=[browser_tool]
)
research_task = Task(
description="""
Research the latest developments in renewable energy technology.
1. Find information about recent breakthroughs in solar energy
2. Research advancements in wind power technology
3. Analyze trends in energy storage solutions
4. Compare the cost-effectiveness of different renewable energy sources
5. Summarize your findings in a comprehensive report
""",
expected_output="A comprehensive report on the latest developments in renewable energy technology",
agent=interval_agent # Use the interval_agent for this example
)
crew = Crew(
agents=[interval_agent],
tasks=[research_task],
verbose=2
)
result = crew.kickoff()
print("\nResult:")
print(result)

View File

@@ -135,6 +135,14 @@ class Agent(BaseAgent):
default=None,
description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
)
reasoning_interval: Optional[int] = Field(
default=None,
description="Interval of steps after which the agent should reason again during execution. If None, reasoning only happens before execution.",
)
adaptive_reasoning: bool = Field(
default=False,
description="Whether the agent should adaptively decide when to reason during execution based on context.",
)
embedder: Optional[Dict[str, Any]] = Field(
default=None,
description="Embedder configuration for the agent.",

View File

@@ -85,6 +85,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
self.tools_used: List[str] = []
self.steps_since_reasoning = 0
existing_stop = self.llm.stop or []
self.llm.stop = list(
set(
@@ -190,6 +192,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer, tool_result
)
if self._should_trigger_reasoning():
self._handle_mid_execution_reasoning()
self._invoke_step_callback(formatted_answer)
self._append_message(formatted_answer.text, role="assistant")
@@ -234,6 +239,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> Union[AgentAction, AgentFinish]:
"""Handle the AgentAction, execute tools, and process the results."""
if hasattr(formatted_answer, 'tool') and formatted_answer.tool:
if formatted_answer.tool not in self.tools_used:
self.tools_used.append(formatted_answer.tool)
# Special case for add_image_tool
add_image_tool = self._i18n.tools("add_image")
if (
@@ -452,3 +461,107 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
),
color="red",
)
def _should_trigger_reasoning(self) -> bool:
"""
Determine if mid-execution reasoning should be triggered.
Returns:
bool: True if reasoning should be triggered, False otherwise.
"""
if not hasattr(self.agent, "reasoning") or not self.agent.reasoning:
return False
self.steps_since_reasoning += 1
if hasattr(self.agent, "reasoning_interval") and self.agent.reasoning_interval:
if self.steps_since_reasoning >= self.agent.reasoning_interval:
return True
if hasattr(self.agent, "adaptive_reasoning") and self.agent.adaptive_reasoning:
return self._should_adaptive_reason()
return False
def _should_adaptive_reason(self) -> bool:
"""
Determine if adaptive reasoning should be triggered based on execution context.
Returns:
bool: True if adaptive reasoning should be triggered, False otherwise.
"""
if len(set(self.tools_used[-3:])) > 1 and len(self.tools_used) >= 3:
return True
if self.iterations > self.max_iter // 2:
return True
error_indicators = ["error", "exception", "failed", "unable to", "couldn't"]
recent_messages = self.messages[-3:] if len(self.messages) >= 3 else self.messages
for message in recent_messages:
content = message.get("content", "").lower()
if any(indicator in content for indicator in error_indicators):
return True
return False
def _handle_mid_execution_reasoning(self) -> None:
"""
Handle mid-execution reasoning by calling the reasoning handler.
"""
if not hasattr(self.agent, "reasoning") or not self.agent.reasoning:
return
try:
from crewai.utilities.reasoning_handler import AgentReasoning
current_progress = self._summarize_current_progress()
reasoning_handler = AgentReasoning(task=self.task, agent=self.agent)
reasoning_output = reasoning_handler.handle_mid_execution_reasoning(
current_steps=self.iterations,
tools_used=self.tools_used,
current_progress=current_progress,
iteration_messages=self.messages
)
self.messages.append({
"role": "system",
"content": f"I've reassessed my approach based on progress so far. Updated plan:\n\n{reasoning_output.plan.plan}"
})
self.steps_since_reasoning = 0
except Exception as e:
self._printer.print(
content=f"Error during mid-execution reasoning: {str(e)}",
color="red",
)
def _summarize_current_progress(self) -> str:
"""
Create a summary of the current execution progress.
Returns:
str: A summary of the current progress.
"""
recent_messages = self.messages[-5:] if len(self.messages) >= 5 else self.messages
summary = f"After {self.iterations} steps, "
if self.tools_used:
unique_tools = set(self.tools_used)
summary += f"I've used {len(self.tools_used)} tools ({', '.join(unique_tools)}). "
else:
summary += "I haven't used any tools yet. "
if recent_messages:
last_message = recent_messages[-1].get("content", "")
if len(last_message) > 100:
last_message = last_message[:100] + "..."
summary += f"Most recent action: {last_message}"
return summary

View File

@@ -28,4 +28,25 @@ class AgentReasoningFailedEvent(BaseEvent):
agent_role: str
task_id: str
error: str
attempt: int = 1
attempt: int = 1
class AgentMidExecutionReasoningStartedEvent(BaseEvent):
"""Event emitted when an agent starts mid-execution reasoning."""
type: str = "agent_mid_execution_reasoning_started"
agent_role: str
task_id: str
current_step: int
reasoning_trigger: str # "interval" or "adaptive"
class AgentMidExecutionReasoningCompletedEvent(BaseEvent):
"""Event emitted when an agent completes mid-execution reasoning."""
type: str = "agent_mid_execution_reasoning_completed"
agent_role: str
task_id: str
current_step: int
updated_plan: str
reasoning_trigger: str

View File

@@ -385,3 +385,186 @@ class AgentReasoning:
"The _handle_agent_reasoning method is deprecated. Use handle_agent_reasoning instead."
)
return self.handle_agent_reasoning()
def handle_mid_execution_reasoning(
self,
current_steps: int,
tools_used: list,
current_progress: str,
iteration_messages: list
) -> AgentReasoningOutput:
"""
Handle reasoning during task execution with context about current progress.
Args:
current_steps: Number of steps executed so far
tools_used: List of tools that have been used
current_progress: Summary of progress made so far
iteration_messages: Recent conversation messages
Returns:
AgentReasoningOutput: Updated reasoning plan based on current context
"""
try:
from crewai.utilities.events.reasoning_events import AgentMidExecutionReasoningStartedEvent
reasoning_trigger = "interval"
if self.agent.adaptive_reasoning:
reasoning_trigger = "adaptive"
crewai_event_bus.emit(
self.agent,
AgentMidExecutionReasoningStartedEvent(
agent_role=self.agent.role,
task_id=str(self.task.id),
current_step=current_steps,
reasoning_trigger=reasoning_trigger,
),
)
except Exception:
# Ignore event bus errors to avoid breaking execution
pass
try:
output = self.__handle_mid_execution_reasoning(
current_steps, tools_used, current_progress, iteration_messages
)
# Emit reasoning completed event
try:
from crewai.utilities.events.reasoning_events import AgentMidExecutionReasoningCompletedEvent
reasoning_trigger = "interval"
if self.agent.adaptive_reasoning:
reasoning_trigger = "adaptive"
crewai_event_bus.emit(
self.agent,
AgentMidExecutionReasoningCompletedEvent(
agent_role=self.agent.role,
task_id=str(self.task.id),
current_step=current_steps,
updated_plan=output.plan.plan,
reasoning_trigger=reasoning_trigger,
),
)
except Exception:
pass
return output
except Exception as e:
# Emit reasoning failed event
try:
from crewai.utilities.events.reasoning_events import AgentReasoningFailedEvent
crewai_event_bus.emit(
self.agent,
AgentReasoningFailedEvent(
agent_role=self.agent.role,
task_id=str(self.task.id),
error=str(e),
attempt=1,
),
)
except Exception:
pass
raise
def __handle_mid_execution_reasoning(
self,
current_steps: int,
tools_used: list,
current_progress: str,
iteration_messages: list
) -> AgentReasoningOutput:
"""
Private method that handles the mid-execution reasoning process.
Args:
current_steps: Number of steps executed so far
tools_used: List of tools that have been used
current_progress: Summary of progress made so far
iteration_messages: Recent conversation messages
Returns:
AgentReasoningOutput: The output of the mid-execution reasoning process.
"""
mid_execution_prompt = self.__create_mid_execution_prompt(
current_steps, tools_used, current_progress, iteration_messages
)
if self.llm.supports_function_calling():
plan, ready = self.__call_with_function(mid_execution_prompt, "mid_execution_plan")
else:
system_prompt = self.i18n.retrieve("reasoning", "mid_execution_plan").format(
role=self.agent.role,
goal=self.agent.goal,
backstory=self.__get_agent_backstory()
)
response = self.llm.call(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": mid_execution_prompt}
]
)
plan, ready = self.__parse_reasoning_response(str(response))
reasoning_plan = ReasoningPlan(plan=plan, ready=ready)
return AgentReasoningOutput(plan=reasoning_plan)
def __create_mid_execution_prompt(
self,
current_steps: int,
tools_used: list,
current_progress: str,
iteration_messages: list
) -> str:
"""
Creates a prompt for the agent to reason during task execution.
Args:
current_steps: Number of steps executed so far
tools_used: List of tools that have been used
current_progress: Summary of progress made so far
iteration_messages: Recent conversation messages
Returns:
str: The mid-execution reasoning prompt.
"""
tools_used_str = ", ".join(tools_used) if tools_used else "No tools used yet"
recent_messages = ""
if iteration_messages:
recent_msgs = iteration_messages[-6:] if len(iteration_messages) > 6 else iteration_messages
for msg in recent_msgs:
role = msg.get("role", "unknown")
content = msg.get("content", "")
if content:
recent_messages += f"{role.upper()}: {content[:200]}...\n\n"
return f"""You are currently executing a task and need to reassess your plan based on progress so far.
TASK DESCRIPTION:
{self.task.description}
EXPECTED OUTPUT:
{self.task.expected_output}
CURRENT PROGRESS:
Steps completed: {current_steps}
Tools used: {tools_used_str}
Progress summary: {current_progress}
RECENT CONVERSATION:
{recent_messages}
Based on the current progress and context, please reassess your plan for completing this task.
Consider what has been accomplished, what challenges you've encountered, and what steps remain.
Adjust your strategy if needed or confirm your current approach is still optimal.
Provide a detailed updated plan for completing the task.
End with "READY: I am ready to continue executing the task." if you're confident in your plan.
"""

View File

@@ -0,0 +1,213 @@
"""Tests for reasoning interval and adaptive reasoning in agents."""
import json
import pytest
from unittest.mock import patch, MagicMock
from crewai import Agent, Task
from crewai.llm import LLM
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.utilities.reasoning_handler import AgentReasoning
@pytest.fixture
def mock_llm_responses():
"""Fixture for mock LLM responses."""
return {
"initial_reasoning": "I'll solve this task step by step.\n\nREADY: I am ready to execute the task.\n\n",
"mid_execution_reasoning": "Based on progress so far, I'll adjust my approach.\n\nREADY: I am ready to continue executing the task.",
"execution_step": "I'm working on the task...",
"final_result": "Task completed successfully."
}
def test_agent_with_reasoning_interval(mock_llm_responses):
"""Test agent with reasoning interval."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning interval feature",
backstory="I am a test agent created to verify the reasoning interval feature works correctly.",
llm=llm,
reasoning=True,
reasoning_interval=2, # Reason every 2 steps
verbose=True
)
task = Task(
description="Multi-step task that requires periodic reasoning.",
expected_output="The task should be completed with periodic reasoning.",
agent=agent
)
with patch('crewai.agent.Agent.create_agent_executor') as mock_create_executor:
mock_executor = MagicMock()
mock_executor._handle_mid_execution_reasoning = MagicMock()
mock_executor.invoke.return_value = mock_llm_responses["final_result"]
mock_create_executor.return_value = mock_executor
result = agent.execute_task(task)
assert result == mock_llm_responses["final_result"]
mock_executor._handle_mid_execution_reasoning.assert_called()
def test_agent_with_adaptive_reasoning(mock_llm_responses):
"""Test agent with adaptive reasoning."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the adaptive reasoning feature",
backstory="I am a test agent created to verify the adaptive reasoning feature works correctly.",
llm=llm,
reasoning=True,
adaptive_reasoning=True,
verbose=True
)
task = Task(
description="Complex task that requires adaptive reasoning.",
expected_output="The task should be completed with adaptive reasoning.",
agent=agent
)
with patch('crewai.agent.Agent.create_agent_executor') as mock_create_executor:
mock_executor = MagicMock()
mock_executor._should_adaptive_reason = MagicMock(return_value=True)
mock_executor._handle_mid_execution_reasoning = MagicMock()
mock_executor.invoke.return_value = mock_llm_responses["final_result"]
mock_create_executor.return_value = mock_executor
result = agent.execute_task(task)
assert result == mock_llm_responses["final_result"]
mock_executor._should_adaptive_reason.assert_called()
mock_executor._handle_mid_execution_reasoning.assert_called()
def test_mid_execution_reasoning_handler():
"""Test the mid-execution reasoning handler."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the mid-execution reasoning handler",
backstory="I am a test agent created to verify the mid-execution reasoning handler works correctly.",
llm=llm,
reasoning=True,
verbose=True
)
task = Task(
description="Task to test mid-execution reasoning handler.",
expected_output="The mid-execution reasoning handler should work correctly.",
agent=agent
)
agent.llm.call = MagicMock(return_value="Based on progress, I'll adjust my approach.\n\nREADY: I am ready to continue executing the task.")
reasoning_handler = AgentReasoning(task=task, agent=agent)
result = reasoning_handler.handle_mid_execution_reasoning(
current_steps=3,
tools_used=["search_tool", "calculator_tool"],
current_progress="Made progress on steps 1-3",
iteration_messages=[
{"role": "assistant", "content": "I'll search for information."},
{"role": "system", "content": "Search results: ..."},
{"role": "assistant", "content": "I'll calculate the answer."},
{"role": "system", "content": "Calculation result: 42"}
]
)
assert result is not None
assert hasattr(result, 'plan')
assert hasattr(result.plan, 'plan')
assert hasattr(result.plan, 'ready')
assert result.plan.ready is True
def test_should_trigger_reasoning_interval():
"""Test the _should_trigger_reasoning method with interval-based reasoning."""
agent = MagicMock()
agent.reasoning = True
agent.reasoning_interval = 3
agent.adaptive_reasoning = False
executor = CrewAgentExecutor(
llm=MagicMock(),
task=MagicMock(),
crew=MagicMock(),
agent=agent,
prompt={},
max_iter=10,
tools=[],
tools_names="",
stop_words=[],
tools_description="",
tools_handler=MagicMock()
)
executor.steps_since_reasoning = 0
assert executor._should_trigger_reasoning() is False
executor.steps_since_reasoning = 2
assert executor._should_trigger_reasoning() is False
executor.steps_since_reasoning = 3
assert executor._should_trigger_reasoning() is True
executor.steps_since_reasoning = 4
assert executor._should_trigger_reasoning() is True
def test_should_trigger_adaptive_reasoning():
"""Test the _should_adaptive_reason method."""
agent = MagicMock()
agent.reasoning = True
agent.reasoning_interval = None
agent.adaptive_reasoning = True
executor = CrewAgentExecutor(
llm=MagicMock(),
task=MagicMock(),
crew=MagicMock(),
agent=agent,
prompt={},
max_iter=10,
tools=[],
tools_names="",
stop_words=[],
tools_description="",
tools_handler=MagicMock()
)
executor.tools_used = ["tool1", "tool2", "tool3"]
assert executor._should_adaptive_reason() is True
executor.tools_used = ["tool1", "tool1", "tool1"]
executor.iterations = 6 # > max_iter // 2
assert executor._should_adaptive_reason() is True
executor.tools_used = ["tool1", "tool1", "tool1"]
executor.iterations = 2
executor.messages = [
{"role": "assistant", "content": "I'll try this approach."},
{"role": "system", "content": "Error: Failed to execute the command."},
{"role": "assistant", "content": "Let me try something else."}
]
assert executor._should_adaptive_reason() is True
executor.tools_used = ["tool1", "tool1", "tool1"]
executor.iterations = 2
executor.messages = [
{"role": "assistant", "content": "I'll try this approach."},
{"role": "system", "content": "Command executed successfully."},
{"role": "assistant", "content": "Let me continue with the next step."}
]
assert executor._should_adaptive_reason() is False