execute todos and be able to track them

This commit is contained in:
lorenzejay
2026-02-05 10:51:54 -08:00
parent 7e1ae7226b
commit 81d9fd4ab3
4 changed files with 960 additions and 23 deletions

View File

@@ -1973,12 +1973,19 @@ class Agent(BaseAgent):
else str(raw_output)
)
# Extract todo execution results from executor state
todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items)
return LiteAgentOutput(
raw=raw_str,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
plan=executor.state.plan,
todos=todo_results,
replan_count=executor.state.replan_count,
last_replan_reason=executor.state.last_replan_reason,
)
async def _execute_and_build_output_async(
@@ -2051,12 +2058,19 @@ class Agent(BaseAgent):
else str(raw_output)
)
# Extract todo execution results from executor state
todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items)
return LiteAgentOutput(
raw=raw_str,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
plan=executor.state.plan,
todos=todo_results,
replan_count=executor.state.replan_count,
last_replan_reason=executor.state.last_replan_reason,
)
def _process_kickoff_guardrail(

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from datetime import datetime
import json
@@ -102,6 +103,12 @@ class AgentReActState(BaseModel):
todos: TodoList = Field(
default_factory=TodoList, description="Todo list for tracking plan execution"
)
replan_count: int = Field(
default=0, description="Number of times the plan has been regenerated"
)
last_replan_reason: str | None = Field(
default=None, description="Reason for the last replan, if any"
)
class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
@@ -389,9 +396,278 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.todos = TodoList(items=todos)
@listen(generate_plan)
# -------------------------------------------------------------------------
# Todo-Driven Execution Flow
# -------------------------------------------------------------------------
@router(generate_plan)
def check_todos_available(
self,
) -> Literal["has_todos", "no_todos", "planning_disabled"]:
"""Check if todos were created from planning.
Routes to todo-driven execution if todos exist, otherwise falls back
to standard execution flow.
"""
if not getattr(self.agent, "planning_enabled", False):
return "planning_disabled"
if not self.state.todos.items:
return "no_todos"
return "has_todos"
@router("has_todos")
def get_ready_todos_method(
self,
) -> Literal["single_todo_ready", "multiple_todos_ready", "all_todos_complete"]:
"""Find todos whose dependencies are satisfied.
Determines if we can execute a single todo sequentially or multiple
todos in parallel.
"""
ready = self.state.todos.get_ready_todos()
# DEBUG: Trace todo readiness
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] get_ready_todos_method: found {len(ready)} ready todos",
color="cyan",
)
for todo in self.state.todos.items:
self._printer.print(
content=f"[DEBUG] Todo {todo.step_number}: status={todo.status}, desc={todo.description[:50]}...",
color="cyan",
)
if not ready:
return "all_todos_complete"
if len(ready) == 1:
# Mark the single ready todo as running
self.state.todos.mark_running(ready[0].step_number)
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] Marked todo {ready[0].step_number} as running -> single_todo_ready",
color="cyan",
)
return "single_todo_ready"
# Multiple todos ready - can parallelize
if self.agent.verbose:
self._printer.print(
content="[DEBUG] Multiple todos ready -> multiple_todos_ready",
color="cyan",
)
return "multiple_todos_ready"
@router("single_todo_ready")
def execute_todo_sequential(self) -> Literal["todo_injected"]:
"""Prepare to execute a single todo by injecting its context.
Adds a focused prompt for the current todo to the conversation,
guiding the agent to complete this specific step.
"""
current = self.state.todos.current_todo
# DEBUG: Trace starting todo execution
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] execute_todo_sequential: starting todo {current.step_number if current else None}",
color="cyan",
)
if current:
self._printer.print(
content=f"[DEBUG] Description: {current.description[:60]}...",
color="cyan",
)
if current:
self._inject_todo_context(current)
return "todo_injected"
def _inject_todo_context(self, todo: TodoItem) -> None:
"""Inject todo-specific context into the conversation.
Args:
todo: The todo item to inject context for.
"""
prompt = self._build_todo_prompt(todo)
todo_message: LLMMessage = {
"role": "user",
"content": prompt,
}
self.state.messages.append(todo_message)
def _build_todo_prompt(self, todo: TodoItem) -> str:
"""Build a focused prompt for executing a single todo.
Args:
todo: The todo item to build a prompt for.
Returns:
A prompt string focused on this specific step.
"""
total = len(self.state.todos.items)
parts = [f"**Current Step {todo.step_number}/{total}**"]
parts.append(f"Task: {todo.description}")
if todo.tool_to_use:
parts.append(f"Suggested tool: {todo.tool_to_use}")
# Include results from completed dependencies
if todo.depends_on:
dep_results = []
for dep_num in todo.depends_on:
dep = self.state.todos.get_by_step_number(dep_num)
if dep and dep.result:
# Truncate long results
result_preview = (
dep.result[:500] + "..."
if len(dep.result) > 500
else dep.result
)
dep_results.append(f"Step {dep_num} result: {result_preview}")
if dep_results:
parts.append("\nContext from previous steps:")
parts.extend(dep_results)
parts.append("\nComplete this step. Once done, provide your result.")
return "\n".join(parts)
@router("multiple_todos_ready")
async def execute_todos_parallel(self) -> Literal["parallel_todos_complete"]:
"""Execute multiple independent todos concurrently.
When multiple todos have their dependencies satisfied, they can
run in parallel for efficiency.
"""
ready = self.state.todos.get_ready_todos()
# Mark all ready todos as running
for todo in ready:
self.state.todos.mark_running(todo.step_number)
# Execute each todo in parallel
tasks = [self._execute_single_todo_async(todo) for todo in ready]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Store results and mark completed
for todo, result in zip(ready, results, strict=True):
if isinstance(result, Exception):
error_msg = f"Error: {result!s}"
self.state.todos.mark_completed(todo.step_number, result=error_msg)
if self.agent.verbose:
self._printer.print(
content=f"Todo {todo.step_number} failed: {error_msg}",
color="red",
)
else:
self.state.todos.mark_completed(todo.step_number, result=str(result))
if self.agent.verbose:
self._printer.print(
content=f"Todo {todo.step_number} completed",
color="green",
)
return "parallel_todos_complete"
async def _execute_single_todo_async(self, todo: TodoItem) -> str:
"""Execute a single todo item asynchronously.
Args:
todo: The todo item to execute.
Returns:
The result of executing the todo.
"""
# Build messages for this specific todo
todo_prompt = self._build_todo_prompt(todo)
messages: list[LLMMessage] = [
{"role": "system", "content": self._get_todo_system_prompt()},
{"role": "user", "content": todo_prompt},
]
# If the todo specifies a tool and we have native tool support
if todo.tool_to_use and self.state.use_native_tools:
try:
response = await asyncio.to_thread(
self.llm.call,
messages,
tools=self._openai_tools,
available_functions=self._available_functions,
)
# Handle tool calls if returned
if isinstance(response, list) and response:
# Execute the tool call
tool_results = []
for tool_call in response:
info = extract_tool_call_info(tool_call)
if info:
_call_id, func_name, func_args = info
if func_name in self._available_functions:
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
tool_func = self._available_functions[func_name]
result = tool_func(**args_dict)
tool_results.append(str(result))
return "\n".join(tool_results) if tool_results else str(response)
return str(response)
except Exception as e:
return f"Tool execution error: {e!s}"
# Standard LLM call without tools
try:
response = await asyncio.to_thread(self.llm.call, messages)
return str(response)
except Exception as e:
return f"LLM call error: {e!s}"
def _get_todo_system_prompt(self) -> str:
"""Get the system prompt for todo execution.
Returns:
A system prompt for focused step execution.
"""
role = self.agent.role if self.agent else "Assistant"
goal = self.agent.goal if self.agent else "Complete tasks efficiently"
return f"""You are {role}. Your goal: {goal}
You are executing a specific step in a multi-step plan. Focus only on completing
the current step. Use the suggested tool if one is provided. Be concise and
provide clear results that can be used by subsequent steps."""
@router("parallel_todos_complete")
def after_parallel_execution(
self,
) -> Literal["has_todos", "all_todos_complete", "needs_replan"]:
"""Check for more todos after parallel execution completes.
Also checks if replanning is needed based on execution results.
"""
# Check if replanning is needed before continuing
should_replan, reason = self._should_replan()
if should_replan:
self.state.last_replan_reason = reason
return "needs_replan"
if self.state.todos.is_complete:
return "all_todos_complete"
return "has_todos"
@router(or_("todo_injected", "no_todos", "planning_disabled"))
def initialize_reasoning(self) -> Literal["initialized"]:
"""Initialize the reasoning flow and emit agent start logs."""
"""Initialize the reasoning flow and emit agent start logs.
This is called either after todo context is injected, or when
there are no todos (falling back to standard execution).
"""
self._show_start_logs()
# Check for native tool support on first iteration
if self.state.iterations == 0:
@@ -400,7 +676,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._setup_native_tools()
return "initialized"
@listen("force_final_answer")
@router("force_final_answer")
def force_final_answer(self) -> Literal["agent_finished"]:
"""Force agent to provide final answer when max iterations exceeded."""
formatted_answer = handle_max_iterations_exceeded(
@@ -418,7 +694,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "agent_finished"
@listen("continue_reasoning")
@router("continue_reasoning")
def call_llm_and_parse(self) -> Literal["parsed", "parser_error", "context_error"]:
"""Execute LLM call with hooks and parse the response.
@@ -484,15 +760,20 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
raise
@listen("continue_reasoning_native")
@router("continue_reasoning_native")
def call_llm_native_tools(
self,
) -> Literal["native_tool_calls", "native_finished", "context_error"]:
) -> Literal[
"native_tool_calls", "native_finished", "context_error", "todo_satisfied"
]:
"""Execute LLM call with native function calling.
Always calls the LLM so it can read reflection prompts and decide
whether to provide a final answer or request more tools.
When todos are active and the LLM produces a final answer, we treat it
as completing the current todo rather than finishing the entire task.
Returns routing decision based on whether tool calls or final answer.
"""
try:
@@ -534,7 +815,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer.model_dump_json())
return "native_finished"
return self._route_finish_with_todos("native_finished")
# Text response - this is the final answer
if isinstance(answer, str):
@@ -546,7 +827,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer)
return "native_finished"
return self._route_finish_with_todos("native_finished")
# Unexpected response type, treat as final answer
self.state.current_answer = AgentFinish(
@@ -557,7 +838,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(str(answer))
return "native_finished"
return self._route_finish_with_todos("native_finished")
except Exception as e:
if is_context_length_exceeded(e):
@@ -568,14 +849,60 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
raise
def _route_finish_with_todos(
self, default_route: str
) -> Literal["native_finished", "agent_finished", "todo_satisfied"]:
"""Helper to route finish events, checking for pending todos first.
If there are pending todos, route to todo_satisfied instead of the
default finish event to continue processing todos.
Args:
default_route: The default route to use if no todos are pending.
Returns:
"todo_satisfied" if todos need processing, otherwise the default route.
"""
if self.state.todos.items and not self.state.todos.is_complete:
current_todo = self.state.todos.current_todo
if current_todo:
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] Finish with pending todos -> treating as todo_satisfied for todo {current_todo.step_number}",
color="cyan",
)
return "todo_satisfied"
return default_route # type: ignore[return-value]
@router(call_llm_and_parse)
def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]:
"""Route based on whether answer is AgentAction or AgentFinish."""
def route_by_answer_type(
self,
) -> Literal["execute_tool", "agent_finished", "todo_satisfied"]:
"""Route based on whether answer is AgentAction or AgentFinish.
When todos are active and the LLM produces a final answer, we treat it
as completing the current todo rather than finishing the entire task.
"""
# DEBUG: Trace routing decision
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] route_by_answer_type: answer_type={type(self.state.current_answer).__name__}",
color="cyan",
)
if self.state.todos.items:
pending = [t for t in self.state.todos.items if t.status == "pending"]
running = [t for t in self.state.todos.items if t.status == "running"]
self._printer.print(
content=f"[DEBUG] Todos: {len(pending)} pending, {len(running)} running, current={self.state.todos.current_todo}",
color="cyan",
)
if isinstance(self.state.current_answer, AgentAction):
return "execute_tool"
return "agent_finished"
@listen("execute_tool")
return self._route_finish_with_todos("agent_finished")
@router("execute_tool")
def execute_tool_action(self) -> Literal["tool_completed", "tool_result_is_final"]:
"""Execute the tool action and handle the result."""
@@ -641,7 +968,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._console.print(error_text)
raise
@listen("native_tool_calls")
@router("native_tool_calls")
def execute_native_tool(
self,
) -> Literal["native_tool_completed", "tool_result_is_final"]:
@@ -925,10 +1252,50 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "unknown"
@router(execute_native_tool)
def increment_native_and_continue(self) -> Literal["initialized"]:
"""Increment iteration counter after native tool execution."""
self.state.iterations += 1
return "initialized"
def check_native_todo_completion(
self,
) -> Literal["todo_satisfied", "todo_not_satisfied"]:
"""Check if the native tool execution satisfied the active todo.
Similar to check_todo_completion but for native tool execution path.
"""
current_todo = self.state.todos.current_todo
# DEBUG: Trace native todo completion check
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] check_native_todo_completion: current_todo={current_todo.step_number if current_todo else None}",
color="cyan",
)
if not current_todo:
# No active todo, continue with normal iteration
if self.agent.verbose:
self._printer.print(
content="[DEBUG] No current todo -> todo_not_satisfied",
color="cyan",
)
return "todo_not_satisfied"
# For native tools, any tool execution satisfies the todo
# The tool name matching is handled by native tool execution
if current_todo.tool_to_use:
# Check if any tool in the recent execution matched the expected tool
# For simplicity, any tool execution counts when there's a current todo
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] Native tool execution for todo {current_todo.step_number} -> todo_satisfied",
color="cyan",
)
return "todo_satisfied"
# Any tool use counts when no specific tool is required
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] Any native tool use counts for todo {current_todo.step_number} -> todo_satisfied",
color="cyan",
)
return "todo_satisfied"
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
@@ -949,14 +1316,208 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "continue_reasoning"
@router(execute_tool_action)
def check_todo_completion(
self,
) -> Literal["todo_satisfied", "todo_not_satisfied"]:
"""Check if the current tool execution satisfied the active todo.
After a tool is executed, this determines if the current todo
should be marked as complete based on whether:
1. The expected tool was used (if specified)
2. The agent returned a final answer for this step
"""
current_todo = self.state.todos.current_todo
# DEBUG: Trace todo completion check
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] check_todo_completion: current_todo={current_todo.step_number if current_todo else None}, answer_type={type(self.state.current_answer).__name__}",
color="cyan",
)
if not current_todo:
# No active todo, continue with normal iteration
if self.agent.verbose:
self._printer.print(
content="[DEBUG] No current todo -> todo_not_satisfied",
color="cyan",
)
return "todo_not_satisfied"
action = self.state.current_answer
# Check if the expected tool was used
if isinstance(action, AgentAction):
if current_todo.tool_to_use:
# Check if the tool used matches the expected tool
if action.tool == current_todo.tool_to_use:
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] Expected tool {current_todo.tool_to_use} matched -> todo_satisfied",
color="cyan",
)
return "todo_satisfied"
else:
# No specific tool expected, any tool use counts
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] Any tool use counts (used {action.tool}) -> todo_satisfied",
color="cyan",
)
return "todo_satisfied"
# Check if we got a final answer for this step
if isinstance(action, AgentFinish):
if self.agent.verbose:
self._printer.print(
content="[DEBUG] AgentFinish received -> todo_satisfied",
color="cyan",
)
return "todo_satisfied"
if self.agent.verbose:
self._printer.print(
content="[DEBUG] No satisfaction condition met -> todo_not_satisfied",
color="cyan",
)
return "todo_not_satisfied"
@listen("todo_satisfied")
def mark_todo_complete(self) -> Literal["todo_marked"]:
"""Mark the current todo as completed with its result."""
current_todo = self.state.todos.current_todo
# DEBUG: Trace marking todo complete
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] mark_todo_complete called: current_todo={current_todo.step_number if current_todo else None}",
color="cyan",
)
if not current_todo:
if self.agent.verbose:
self._printer.print(
content="[DEBUG] No current todo to mark -> todo_marked",
color="cyan",
)
return "todo_marked"
# Extract result from the current answer
result = ""
if isinstance(self.state.current_answer, AgentFinish):
result = str(self.state.current_answer.output)
elif isinstance(self.state.current_answer, AgentAction):
# Use the tool result (last message should have it)
if self.state.messages:
last_msg = self.state.messages[-1]
if (
last_msg.get("role") == "tool"
or last_msg.get("role") == "assistant"
):
result = str(last_msg.get("content", ""))
self.state.todos.mark_completed(current_todo.step_number, result=result)
if self.agent.verbose:
completed = self.state.todos.completed_count
total = len(self.state.todos.items)
self._printer.print(
content=f"✓ Todo {current_todo.step_number} completed ({completed}/{total})",
color="green",
)
self._printer.print(
content=f"[DEBUG] Marked todo {current_todo.step_number} as completed, result_len={len(result)}",
color="cyan",
)
return "todo_marked"
@router(mark_todo_complete)
def check_more_todos(
self,
) -> Literal["has_todos", "all_todos_complete", "needs_replan"]:
"""Check if there are more todos to execute after marking one complete.
Also checks if replanning is needed based on execution results.
"""
# DEBUG: Trace checking for more todos
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] check_more_todos: is_complete={self.state.todos.is_complete}",
color="cyan",
)
for todo in self.state.todos.items:
self._printer.print(
content=f"[DEBUG] Todo {todo.step_number}: status={todo.status}",
color="cyan",
)
# Check if replanning is needed before continuing
should_replan, reason = self._should_replan()
if should_replan:
self.state.last_replan_reason = reason
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] Replanning needed: {reason} -> needs_replan",
color="cyan",
)
return "needs_replan"
if self.state.todos.is_complete:
if self.agent.verbose:
self._printer.print(
content="[DEBUG] All todos complete -> all_todos_complete",
color="cyan",
)
return "all_todos_complete"
if self.agent.verbose:
self._printer.print(
content="[DEBUG] More todos to execute -> has_todos",
color="cyan",
)
return "has_todos"
@router("todo_not_satisfied")
def increment_and_continue(self) -> Literal["initialized"]:
"""Increment iteration counter and loop back for next iteration."""
"""Increment iteration counter and loop back for next iteration.
Called when a tool execution didn't satisfy the current todo,
allowing the agent to continue working on it.
"""
self.state.iterations += 1
return "initialized"
@listen(or_("agent_finished", "tool_result_is_final", "native_finished"))
@listen(
or_(
"all_todos_complete",
"agent_finished",
"tool_result_is_final",
"native_finished",
)
)
def finalize(self) -> Literal["completed", "skipped"]:
"""Finalize execution and emit completion logs."""
"""Finalize execution and emit completion logs.
If todos were used, synthesizes a final answer from all todo results.
"""
# DEBUG: Trace finalize being called
if self.agent.verbose:
self._printer.print(
content=f"[DEBUG] finalize called! todos_count={len(self.state.todos.items)}, todos_complete={self.state.todos.is_complete}",
color="magenta",
)
if self.state.todos.items:
for todo in self.state.todos.items:
self._printer.print(
content=f"[DEBUG] Todo {todo.step_number}: status={todo.status}, desc={todo.description[:40]}...",
color="magenta",
)
# If we have completed todos, synthesize the final answer
if self.state.todos.items and self.state.todos.is_complete:
self._synthesize_final_answer_from_todos()
if self.state.current_answer is None:
skip_text = Text()
skip_text.append("⚠️ ", style="yellow bold")
@@ -982,7 +1543,239 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "completed"
@listen("parser_error")
def _synthesize_final_answer_from_todos(self) -> None:
"""Combine all todo results into a final answer.
Creates an AgentFinish from the accumulated results of all
completed todos.
"""
results: list[str] = []
for todo in self.state.todos.items:
if todo.result:
results.append(f"**Step {todo.step_number}**: {todo.description}")
results.append(todo.result)
results.append("") # Empty line for spacing
if results:
combined = "\n".join(results)
self.state.current_answer = AgentFinish(
thought="All planned steps completed successfully",
output=combined,
text=combined,
)
# -------------------------------------------------------------------------
# Dynamic Replanning Methods
# -------------------------------------------------------------------------
def _should_replan(self) -> tuple[bool, str]:
"""Determine if dynamic replanning is needed.
Checks for conditions that warrant regenerating the execution plan:
1. Multiple consecutive todo failures
2. All todos completed but agent indicates incomplete results
3. Agent explicitly requested a replan via tool or output
Returns:
Tuple of (should_replan: bool, reason: str)
"""
max_replans = 3 # Maximum number of replanning attempts
# Don't replan if we've hit the limit
if self.state.replan_count >= max_replans:
return False, "Max replan attempts reached"
# Check for failed todos
failed_todos = [
todo for todo in self.state.todos.items if todo.status == "failed"
]
if len(failed_todos) >= 2:
return True, f"Multiple todos failed ({len(failed_todos)} failures)"
# Check for todos with error results
error_todos = [
todo
for todo in self.state.todos.items
if todo.result and todo.result.startswith("Error:")
]
if len(error_todos) >= 2:
return (
True,
f"Multiple todos encountered errors ({len(error_todos)} errors)",
)
# Check if agent's last message indicates need for replanning
if self.state.messages:
last_msg = self.state.messages[-1]
content = str(last_msg.get("content", "")).lower()
replan_indicators = [
"need to reconsider",
"approach isn't working",
"try a different approach",
"replan",
"revise the plan",
"plan needs adjustment",
]
for indicator in replan_indicators:
if indicator in content:
return True, f"Agent indicated replanning needed: '{indicator}'"
return False, ""
def _trigger_replan(self, reason: str) -> None:
"""Trigger dynamic replanning with accumulated context.
Regenerates the execution plan based on what has been learned
from previous attempts, including failures and partial results.
Args:
reason: The reason for triggering the replan.
"""
self.state.replan_count += 1
self.state.last_replan_reason = reason
if self.agent.verbose:
self._printer.print(
content=f"Triggering replan (attempt {self.state.replan_count}): {reason}",
color="yellow",
)
# Build context from previous execution attempts
previous_context = self._build_replan_context()
try:
from crewai.utilities.reasoning_handler import AgentReasoning
if self.task:
planning_handler = AgentReasoning(agent=self.agent, task=self.task)
else:
input_text = getattr(self, "_kickoff_input", "")
planning_handler = AgentReasoning(
agent=self.agent,
description=input_text or "Complete the requested task",
expected_output="Complete the task successfully",
)
# Include previous context in the planning request
# This helps the planner learn from past failures
enhanced_description = self._enhance_task_for_replan(previous_context)
if self.task:
original_description = self.task.description
self.task.description = enhanced_description
output = planning_handler.handle_agent_reasoning()
self.task.description = original_description
else:
planning_handler.description = enhanced_description
output = planning_handler.handle_agent_reasoning()
# Reset todos with new plan
self.state.plan = output.plan.plan
self.state.plan_ready = output.plan.ready
if self.state.plan_ready and output.plan.steps:
self._create_todos_from_plan(output.plan.steps)
if self.agent.verbose:
self._printer.print(
content=f"New plan created with {len(output.plan.steps)} steps",
color="green",
)
except Exception as e:
if hasattr(self.agent, "_logger"):
self.agent._logger.log("error", f"Error during replanning: {e!s}")
# Keep existing todos if replanning fails
self.state.last_replan_reason = f"Replan failed: {e!s}"
def _build_replan_context(self) -> str:
"""Build context from previous execution for replanning.
Summarizes what has been attempted, what failed, and what succeeded
to help the planner create a better plan.
Returns:
A context string describing previous execution state.
"""
context_parts = []
# Summarize completed todos
completed = [t for t in self.state.todos.items if t.status == "completed"]
if completed:
context_parts.append("Successfully completed steps:")
for todo in completed:
result_preview = (
todo.result[:200] + "..."
if todo.result and len(todo.result) > 200
else todo.result
)
context_parts.append(f" - Step {todo.step_number}: {todo.description}")
if result_preview:
context_parts.append(f" Result: {result_preview}")
# Summarize failed todos
failed = [
t
for t in self.state.todos.items
if t.status == "failed" or (t.result and t.result.startswith("Error:"))
]
if failed:
context_parts.append("\nFailed or errored steps:")
for todo in failed:
context_parts.append(f" - Step {todo.step_number}: {todo.description}")
if todo.result:
context_parts.append(f" Error: {todo.result}")
# Add replan history
if self.state.replan_count > 0:
context_parts.append(f"\nThis is replan attempt {self.state.replan_count}.")
if self.state.last_replan_reason:
context_parts.append(
f"Previous replan reason: {self.state.last_replan_reason}"
)
return "\n".join(context_parts)
def _enhance_task_for_replan(self, previous_context: str) -> str:
"""Enhance task description with context for replanning.
Args:
previous_context: Context from previous execution attempts.
Returns:
Enhanced task description for the planner.
"""
original = (
self.task.description if self.task else getattr(self, "_kickoff_input", "")
)
return f"""{original}
IMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan
that accounts for the following context from the previous attempt:
{previous_context}
Consider:
1. What steps succeeded and can be built upon
2. What steps failed and why they might have failed
3. Alternative approaches that might work better
4. Whether dependencies need to be restructured"""
@router("needs_replan")
def handle_replan(self) -> Literal["has_todos", "no_todos"]:
"""Handle replanning request and return to todo execution.
Called when dynamic replanning is triggered. Regenerates the plan
and routes back to todo-driven execution.
"""
reason = self.state.last_replan_reason or "Dynamic replan triggered"
self._trigger_replan(reason)
if self.state.todos.items:
return "has_todos"
return "no_todos"
@router("parser_error")
def recover_from_parser_error(self) -> Literal["initialized"]:
"""Recover from output parser errors and retry."""
if not self._last_parser_error:
@@ -1005,7 +1798,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "initialized"
@listen("context_error")
@router("context_error")
def recover_from_context_length(self) -> Literal["initialized"]:
"""Recover from context length errors and retry."""
handle_context_length(
@@ -1062,6 +1855,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.pending_tool_calls = []
self.state.plan = None
self.state.plan_ready = False
self.state.todos = TodoList()
self.state.replan_count = 0
self.state.last_replan_reason = None
self._kickoff_input = inputs.get("input", "")
@@ -1150,6 +1946,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.pending_tool_calls = []
self.state.plan = None
self.state.plan_ready = False
self.state.todos = TodoList()
self.state.replan_count = 0
self.state.last_replan_reason = None
self._kickoff_input = inputs.get("input", "")

View File

@@ -6,9 +6,27 @@ from typing import Any
from pydantic import BaseModel, Field
from crewai.utilities.planning_types import TodoItem
from crewai.utilities.types import LLMMessage
class TodoExecutionResult(BaseModel):
"""Summary of a single todo execution."""
step_number: int = Field(description="Step number in the plan")
description: str = Field(description="What the todo was supposed to do")
tool_used: str | None = Field(
default=None, description="Tool that was used for this step"
)
status: str = Field(description="Final status: completed, failed, pending")
result: str | None = Field(
default=None, description="Result or error message from execution"
)
depends_on: list[int] = Field(
default_factory=list, description="Step numbers this depended on"
)
class LiteAgentOutput(BaseModel):
"""Class that represents the result of a LiteAgent execution."""
@@ -24,12 +42,75 @@ class LiteAgentOutput(BaseModel):
)
messages: list[LLMMessage] = Field(description="Messages of the agent", default=[])
plan: str | None = Field(
default=None, description="The execution plan that was generated, if any"
)
todos: list[TodoExecutionResult] = Field(
default_factory=list,
description="List of todos that were executed with their results",
)
replan_count: int = Field(
default=0, description="Number of times the plan was regenerated"
)
last_replan_reason: str | None = Field(
default=None, description="Reason for the last replan, if any"
)
@classmethod
def from_todo_items(cls, todo_items: list[TodoItem]) -> list[TodoExecutionResult]:
"""Convert TodoItem objects to TodoExecutionResult summaries.
Args:
todo_items: List of TodoItem objects from execution.
Returns:
List of TodoExecutionResult summaries.
"""
return [
TodoExecutionResult(
step_number=item.step_number,
description=item.description,
tool_used=item.tool_to_use,
status=item.status,
result=item.result,
depends_on=item.depends_on,
)
for item in todo_items
]
def to_dict(self) -> dict[str, Any]:
"""Convert pydantic_output to a dictionary."""
if self.pydantic:
return self.pydantic.model_dump()
return {}
@property
def completed_todos(self) -> list[TodoExecutionResult]:
"""Get only the completed todos."""
return [t for t in self.todos if t.status == "completed"]
@property
def failed_todos(self) -> list[TodoExecutionResult]:
"""Get only the failed todos."""
return [t for t in self.todos if t.status == "failed"]
@property
def had_plan(self) -> bool:
"""Check if the agent executed with a plan."""
return self.plan is not None or len(self.todos) > 0
def __str__(self) -> str:
"""Return the raw output as a string."""
return self.raw
def __repr__(self) -> str:
"""Return a detailed representation including todo summary."""
parts = [f"LiteAgentOutput(role={self.agent_role!r}"]
if self.todos:
completed = len(self.completed_todos)
total = len(self.todos)
parts.append(f", todos={completed}/{total} completed")
if self.replan_count > 0:
parts.append(f", replans={self.replan_count}")
parts.append(")")
return "".join(parts)

View File

@@ -101,3 +101,46 @@ class TodoList(BaseModel):
item.status = "completed"
if result:
item.result = result
def _dependencies_satisfied(self, item: TodoItem) -> bool:
"""Check if all dependencies for a todo item are completed.
Args:
item: The todo item to check dependencies for.
Returns:
True if all dependencies are completed, False otherwise.
"""
for dep_num in item.depends_on:
dep = self.get_by_step_number(dep_num)
if dep is None or dep.status != "completed":
return False
return True
def get_ready_todos(self) -> list[TodoItem]:
"""Get all todos that are ready to execute (pending with satisfied dependencies).
Returns:
List of TodoItem objects that can be executed now.
"""
ready: list[TodoItem] = []
for item in self.items:
if item.status != "pending":
continue
if self._dependencies_satisfied(item):
ready.append(item)
return ready
@property
def can_parallelize(self) -> bool:
"""Check if multiple todos can run in parallel.
Returns:
True if more than one todo is ready to execute.
"""
return len(self.get_ready_todos()) > 1
@property
def running_count(self) -> int:
"""Count of currently running todos."""
return sum(1 for item in self.items if item.status == "running")