ensure configurable timeouts, max_replans and max step iterations

This commit is contained in:
lorenzejay
2026-03-12 14:26:36 -07:00
parent 5cb3a02b03
commit b704089efd
3 changed files with 91 additions and 8 deletions

View File

@@ -107,6 +107,27 @@ class PlanningConfig(BaseModel):
default=None,
description="Custom prompt for refining the plan.",
)
max_replans: int = Field(
default=3,
description="Maximum number of full replanning attempts before finalizing.",
ge=0,
)
max_step_iterations: int = Field(
default=15,
description=(
"Maximum LLM iterations per step in the StepExecutor multi-turn loop. "
"Lower values make steps faster but less thorough."
),
ge=1,
)
step_timeout: int | None = Field(
default=None,
description=(
"Maximum wall-clock seconds for a single step execution. "
"If exceeded, the step is marked as failed and observation decides "
"whether to continue or replan. None means no per-step timeout."
),
)
llm: str | Any | None = Field(
default=None,
description="LLM to use for planning. Uses agent's LLM if None.",

View File

@@ -127,7 +127,13 @@ class StepExecutor:
# Public API
# ------------------------------------------------------------------
def execute(self, todo: TodoItem, context: StepExecutionContext) -> StepResult:
def execute(
self,
todo: TodoItem,
context: StepExecutionContext,
max_step_iterations: int = 15,
step_timeout: int | None = None,
) -> StepResult:
"""Execute a single todo item using a multi-turn action loop.
Enforces the RPM limit, builds a fresh message list, then iterates
@@ -138,6 +144,8 @@ class StepExecutor:
Args:
todo: The todo item to execute.
context: Immutable context with task info and dependency results.
max_step_iterations: Maximum LLM iterations in the multi-turn loop.
step_timeout: Maximum wall-clock seconds for this step. None = no limit.
Returns:
StepResult with the outcome.
@@ -150,9 +158,19 @@ class StepExecutor:
messages = self._build_isolated_messages(todo, context)
if self._use_native_tools:
result_text = self._execute_native(messages, tool_calls_made)
result_text = self._execute_native(
messages, tool_calls_made,
max_step_iterations=max_step_iterations,
step_timeout=step_timeout,
start_time=start_time,
)
else:
result_text = self._execute_text_parsed(messages, tool_calls_made)
result_text = self._execute_text_parsed(
messages, tool_calls_made,
max_step_iterations=max_step_iterations,
step_timeout=step_timeout,
start_time=start_time,
)
self._validate_expected_tool_usage(todo, tool_calls_made)
elapsed = time.monotonic() - start_time
@@ -298,6 +316,8 @@ class StepExecutor:
messages: list[LLMMessage],
tool_calls_made: list[str],
max_step_iterations: int = 15,
step_timeout: int | None = None,
start_time: float | None = None,
) -> str:
"""Execute step using text-parsed tool calling with a multi-turn loop.
@@ -310,6 +330,11 @@ class StepExecutor:
last_tool_result = ""
for _ in range(max_step_iterations):
# Check step timeout
if step_timeout and start_time:
elapsed = time.monotonic() - start_time
if elapsed >= step_timeout:
return last_tool_result or f"Step timed out after {elapsed:.0f}s"
answer = self.llm.call(
messages,
callbacks=self.callbacks,
@@ -504,6 +529,8 @@ class StepExecutor:
messages: list[LLMMessage],
tool_calls_made: list[str],
max_step_iterations: int = 15,
step_timeout: int | None = None,
start_time: float | None = None,
) -> str:
"""Execute step using native function calling with a multi-turn loop.
@@ -515,6 +542,11 @@ class StepExecutor:
accumulated_results: list[str] = []
for _ in range(max_step_iterations):
# Check step timeout
if step_timeout and start_time:
elapsed = time.monotonic() - start_time
if elapsed >= step_timeout:
return "\n\n".join(accumulated_results) if accumulated_results else f"Step timed out after {elapsed:.0f}s"
answer = self.llm.call(
messages,
tools=self._openai_tools,

View File

@@ -451,6 +451,27 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
return config.reasoning_effort
return "medium"
def _get_max_replans(self) -> int:
"""Get max replans from planning config or default to 3."""
config = getattr(self.agent, "planning_config", None)
if config is not None and hasattr(config, "max_replans"):
return config.max_replans
return 3
def _get_max_step_iterations(self) -> int:
"""Get max step iterations from planning config or default to 15."""
config = getattr(self.agent, "planning_config", None)
if config is not None and hasattr(config, "max_step_iterations"):
return config.max_step_iterations
return 15
def _get_step_timeout(self) -> int | None:
"""Get per-step timeout from planning config or default to None."""
config = getattr(self.agent, "planning_config", None)
if config is not None and hasattr(config, "step_timeout"):
return config.step_timeout
return None
def _build_context_for_todo(self, todo: TodoItem) -> StepExecutionContext:
"""Build an isolated execution context for a single todo.
@@ -861,7 +882,7 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
Preserves completed todo results and replaces only pending steps.
"""
max_replans = 3
max_replans = self._get_max_replans()
if self.state.replan_count >= max_replans:
if self.agent.verbose:
@@ -1009,7 +1030,12 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
step_executor = self._ensure_step_executor()
context = self._build_context_for_todo(current)
result = step_executor.execute(current, context)
result = step_executor.execute(
current,
context,
max_step_iterations=self._get_max_step_iterations(),
step_timeout=self._get_step_timeout(),
)
# Store result on the todo (do NOT mark completed — observation decides)
current.result = result.result
@@ -1119,7 +1145,11 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
async def _run_step(todo: TodoItem) -> tuple[TodoItem, object]:
step_executor = self._ensure_step_executor()
context = self._build_context_for_todo(todo)
result = await asyncio.to_thread(step_executor.execute, todo, context)
result = await asyncio.to_thread(
step_executor.execute, todo, context,
self._get_max_step_iterations(),
self._get_step_timeout(),
)
return todo, result
gathered = await asyncio.gather(
@@ -2480,7 +2510,7 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
Returns:
Tuple of (should_replan: bool, reason: str)
"""
max_replans = 3 # Maximum number of replanning attempts
max_replans = self._get_max_replans()
# Don't replan if we've hit the limit
if self.state.replan_count >= max_replans:
@@ -2677,7 +2707,7 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
Called when dynamic replanning is triggered. Regenerates the plan
and routes back to todo-driven execution.
"""
max_replans = 3
max_replans = self._get_max_replans()
if self.state.replan_count >= max_replans:
if self.agent.verbose: