diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 660610e67..2ec06307d 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -1973,12 +1973,19 @@ class Agent(BaseAgent): else str(raw_output) ) + # Extract todo execution results from executor state + todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items) + return LiteAgentOutput( raw=raw_str, pydantic=formatted_result, agent_role=self.role, usage_metrics=usage_metrics.model_dump() if usage_metrics else None, messages=executor.messages, + plan=executor.state.plan, + todos=todo_results, + replan_count=executor.state.replan_count, + last_replan_reason=executor.state.last_replan_reason, ) async def _execute_and_build_output_async( @@ -2051,12 +2058,19 @@ class Agent(BaseAgent): else str(raw_output) ) + # Extract todo execution results from executor state + todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items) + return LiteAgentOutput( raw=raw_str, pydantic=formatted_result, agent_role=self.role, usage_metrics=usage_metrics.model_dump() if usage_metrics else None, messages=executor.messages, + plan=executor.state.plan, + todos=todo_results, + replan_count=executor.state.replan_count, + last_replan_reason=executor.state.last_replan_reason, ) def _process_kickoff_guardrail( diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index e22b8c129..adee3a8c1 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from collections.abc import Callable, Coroutine from datetime import datetime import json @@ -102,6 +103,12 @@ class AgentReActState(BaseModel): todos: TodoList = Field( default_factory=TodoList, description="Todo list for tracking plan execution" ) + replan_count: int = Field( + default=0, description="Number of times the plan has been regenerated" + ) + last_replan_reason: str | None = Field( + default=None, description="Reason for the last replan, if any" + ) class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): @@ -389,9 +396,278 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.todos = TodoList(items=todos) - @listen(generate_plan) + # ------------------------------------------------------------------------- + # Todo-Driven Execution Flow + # ------------------------------------------------------------------------- + + @router(generate_plan) + def check_todos_available( + self, + ) -> Literal["has_todos", "no_todos", "planning_disabled"]: + """Check if todos were created from planning. + + Routes to todo-driven execution if todos exist, otherwise falls back + to standard execution flow. + """ + if not getattr(self.agent, "planning_enabled", False): + return "planning_disabled" + if not self.state.todos.items: + return "no_todos" + return "has_todos" + + @router("has_todos") + def get_ready_todos_method( + self, + ) -> Literal["single_todo_ready", "multiple_todos_ready", "all_todos_complete"]: + """Find todos whose dependencies are satisfied. + + Determines if we can execute a single todo sequentially or multiple + todos in parallel. + """ + ready = self.state.todos.get_ready_todos() + + # DEBUG: Trace todo readiness + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] get_ready_todos_method: found {len(ready)} ready todos", + color="cyan", + ) + for todo in self.state.todos.items: + self._printer.print( + content=f"[DEBUG] Todo {todo.step_number}: status={todo.status}, desc={todo.description[:50]}...", + color="cyan", + ) + + if not ready: + return "all_todos_complete" + + if len(ready) == 1: + # Mark the single ready todo as running + self.state.todos.mark_running(ready[0].step_number) + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] Marked todo {ready[0].step_number} as running -> single_todo_ready", + color="cyan", + ) + return "single_todo_ready" + + # Multiple todos ready - can parallelize + if self.agent.verbose: + self._printer.print( + content="[DEBUG] Multiple todos ready -> multiple_todos_ready", + color="cyan", + ) + return "multiple_todos_ready" + + @router("single_todo_ready") + def execute_todo_sequential(self) -> Literal["todo_injected"]: + """Prepare to execute a single todo by injecting its context. + + Adds a focused prompt for the current todo to the conversation, + guiding the agent to complete this specific step. + """ + current = self.state.todos.current_todo + + # DEBUG: Trace starting todo execution + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] execute_todo_sequential: starting todo {current.step_number if current else None}", + color="cyan", + ) + if current: + self._printer.print( + content=f"[DEBUG] Description: {current.description[:60]}...", + color="cyan", + ) + + if current: + self._inject_todo_context(current) + return "todo_injected" + + def _inject_todo_context(self, todo: TodoItem) -> None: + """Inject todo-specific context into the conversation. + + Args: + todo: The todo item to inject context for. + """ + prompt = self._build_todo_prompt(todo) + todo_message: LLMMessage = { + "role": "user", + "content": prompt, + } + self.state.messages.append(todo_message) + + def _build_todo_prompt(self, todo: TodoItem) -> str: + """Build a focused prompt for executing a single todo. + + Args: + todo: The todo item to build a prompt for. + + Returns: + A prompt string focused on this specific step. + """ + total = len(self.state.todos.items) + parts = [f"**Current Step {todo.step_number}/{total}**"] + parts.append(f"Task: {todo.description}") + + if todo.tool_to_use: + parts.append(f"Suggested tool: {todo.tool_to_use}") + + # Include results from completed dependencies + if todo.depends_on: + dep_results = [] + for dep_num in todo.depends_on: + dep = self.state.todos.get_by_step_number(dep_num) + if dep and dep.result: + # Truncate long results + result_preview = ( + dep.result[:500] + "..." + if len(dep.result) > 500 + else dep.result + ) + dep_results.append(f"Step {dep_num} result: {result_preview}") + if dep_results: + parts.append("\nContext from previous steps:") + parts.extend(dep_results) + + parts.append("\nComplete this step. Once done, provide your result.") + return "\n".join(parts) + + @router("multiple_todos_ready") + async def execute_todos_parallel(self) -> Literal["parallel_todos_complete"]: + """Execute multiple independent todos concurrently. + + When multiple todos have their dependencies satisfied, they can + run in parallel for efficiency. + """ + ready = self.state.todos.get_ready_todos() + + # Mark all ready todos as running + for todo in ready: + self.state.todos.mark_running(todo.step_number) + + # Execute each todo in parallel + tasks = [self._execute_single_todo_async(todo) for todo in ready] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Store results and mark completed + for todo, result in zip(ready, results, strict=True): + if isinstance(result, Exception): + error_msg = f"Error: {result!s}" + self.state.todos.mark_completed(todo.step_number, result=error_msg) + if self.agent.verbose: + self._printer.print( + content=f"Todo {todo.step_number} failed: {error_msg}", + color="red", + ) + else: + self.state.todos.mark_completed(todo.step_number, result=str(result)) + if self.agent.verbose: + self._printer.print( + content=f"Todo {todo.step_number} completed", + color="green", + ) + + return "parallel_todos_complete" + + async def _execute_single_todo_async(self, todo: TodoItem) -> str: + """Execute a single todo item asynchronously. + + Args: + todo: The todo item to execute. + + Returns: + The result of executing the todo. + """ + # Build messages for this specific todo + todo_prompt = self._build_todo_prompt(todo) + messages: list[LLMMessage] = [ + {"role": "system", "content": self._get_todo_system_prompt()}, + {"role": "user", "content": todo_prompt}, + ] + + # If the todo specifies a tool and we have native tool support + if todo.tool_to_use and self.state.use_native_tools: + try: + response = await asyncio.to_thread( + self.llm.call, + messages, + tools=self._openai_tools, + available_functions=self._available_functions, + ) + + # Handle tool calls if returned + if isinstance(response, list) and response: + # Execute the tool call + tool_results = [] + for tool_call in response: + info = extract_tool_call_info(tool_call) + if info: + _call_id, func_name, func_args = info + if func_name in self._available_functions: + if isinstance(func_args, str): + try: + args_dict = json.loads(func_args) + except json.JSONDecodeError: + args_dict = {} + else: + args_dict = func_args + tool_func = self._available_functions[func_name] + result = tool_func(**args_dict) + tool_results.append(str(result)) + return "\n".join(tool_results) if tool_results else str(response) + + return str(response) + except Exception as e: + return f"Tool execution error: {e!s}" + + # Standard LLM call without tools + try: + response = await asyncio.to_thread(self.llm.call, messages) + return str(response) + except Exception as e: + return f"LLM call error: {e!s}" + + def _get_todo_system_prompt(self) -> str: + """Get the system prompt for todo execution. + + Returns: + A system prompt for focused step execution. + """ + role = self.agent.role if self.agent else "Assistant" + goal = self.agent.goal if self.agent else "Complete tasks efficiently" + + return f"""You are {role}. Your goal: {goal} + +You are executing a specific step in a multi-step plan. Focus only on completing +the current step. Use the suggested tool if one is provided. Be concise and +provide clear results that can be used by subsequent steps.""" + + @router("parallel_todos_complete") + def after_parallel_execution( + self, + ) -> Literal["has_todos", "all_todos_complete", "needs_replan"]: + """Check for more todos after parallel execution completes. + + Also checks if replanning is needed based on execution results. + """ + # Check if replanning is needed before continuing + should_replan, reason = self._should_replan() + if should_replan: + self.state.last_replan_reason = reason + return "needs_replan" + + if self.state.todos.is_complete: + return "all_todos_complete" + return "has_todos" + + @router(or_("todo_injected", "no_todos", "planning_disabled")) def initialize_reasoning(self) -> Literal["initialized"]: - """Initialize the reasoning flow and emit agent start logs.""" + """Initialize the reasoning flow and emit agent start logs. + + This is called either after todo context is injected, or when + there are no todos (falling back to standard execution). + """ self._show_start_logs() # Check for native tool support on first iteration if self.state.iterations == 0: @@ -400,7 +676,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._setup_native_tools() return "initialized" - @listen("force_final_answer") + @router("force_final_answer") def force_final_answer(self) -> Literal["agent_finished"]: """Force agent to provide final answer when max iterations exceeded.""" formatted_answer = handle_max_iterations_exceeded( @@ -418,7 +694,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "agent_finished" - @listen("continue_reasoning") + @router("continue_reasoning") def call_llm_and_parse(self) -> Literal["parsed", "parser_error", "context_error"]: """Execute LLM call with hooks and parse the response. @@ -484,15 +760,20 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): handle_unknown_error(self._printer, e, verbose=self.agent.verbose) raise - @listen("continue_reasoning_native") + @router("continue_reasoning_native") def call_llm_native_tools( self, - ) -> Literal["native_tool_calls", "native_finished", "context_error"]: + ) -> Literal[ + "native_tool_calls", "native_finished", "context_error", "todo_satisfied" + ]: """Execute LLM call with native function calling. Always calls the LLM so it can read reflection prompts and decide whether to provide a final answer or request more tools. + When todos are active and the LLM produces a final answer, we treat it + as completing the current todo rather than finishing the entire task. + Returns routing decision based on whether tool calls or final answer. """ try: @@ -534,7 +815,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): ) self._invoke_step_callback(self.state.current_answer) self._append_message_to_state(answer.model_dump_json()) - return "native_finished" + return self._route_finish_with_todos("native_finished") # Text response - this is the final answer if isinstance(answer, str): @@ -546,7 +827,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._invoke_step_callback(self.state.current_answer) self._append_message_to_state(answer) - return "native_finished" + return self._route_finish_with_todos("native_finished") # Unexpected response type, treat as final answer self.state.current_answer = AgentFinish( @@ -557,7 +838,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._invoke_step_callback(self.state.current_answer) self._append_message_to_state(str(answer)) - return "native_finished" + return self._route_finish_with_todos("native_finished") except Exception as e: if is_context_length_exceeded(e): @@ -568,14 +849,60 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): handle_unknown_error(self._printer, e, verbose=self.agent.verbose) raise + def _route_finish_with_todos( + self, default_route: str + ) -> Literal["native_finished", "agent_finished", "todo_satisfied"]: + """Helper to route finish events, checking for pending todos first. + + If there are pending todos, route to todo_satisfied instead of the + default finish event to continue processing todos. + + Args: + default_route: The default route to use if no todos are pending. + + Returns: + "todo_satisfied" if todos need processing, otherwise the default route. + """ + if self.state.todos.items and not self.state.todos.is_complete: + current_todo = self.state.todos.current_todo + if current_todo: + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] Finish with pending todos -> treating as todo_satisfied for todo {current_todo.step_number}", + color="cyan", + ) + return "todo_satisfied" + return default_route # type: ignore[return-value] + @router(call_llm_and_parse) - def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]: - """Route based on whether answer is AgentAction or AgentFinish.""" + def route_by_answer_type( + self, + ) -> Literal["execute_tool", "agent_finished", "todo_satisfied"]: + """Route based on whether answer is AgentAction or AgentFinish. + + When todos are active and the LLM produces a final answer, we treat it + as completing the current todo rather than finishing the entire task. + """ + # DEBUG: Trace routing decision + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] route_by_answer_type: answer_type={type(self.state.current_answer).__name__}", + color="cyan", + ) + if self.state.todos.items: + pending = [t for t in self.state.todos.items if t.status == "pending"] + running = [t for t in self.state.todos.items if t.status == "running"] + self._printer.print( + content=f"[DEBUG] Todos: {len(pending)} pending, {len(running)} running, current={self.state.todos.current_todo}", + color="cyan", + ) + if isinstance(self.state.current_answer, AgentAction): return "execute_tool" - return "agent_finished" - @listen("execute_tool") + return self._route_finish_with_todos("agent_finished") + + @router("execute_tool") def execute_tool_action(self) -> Literal["tool_completed", "tool_result_is_final"]: """Execute the tool action and handle the result.""" @@ -641,7 +968,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._console.print(error_text) raise - @listen("native_tool_calls") + @router("native_tool_calls") def execute_native_tool( self, ) -> Literal["native_tool_completed", "tool_result_is_final"]: @@ -925,10 +1252,50 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "unknown" @router(execute_native_tool) - def increment_native_and_continue(self) -> Literal["initialized"]: - """Increment iteration counter after native tool execution.""" - self.state.iterations += 1 - return "initialized" + def check_native_todo_completion( + self, + ) -> Literal["todo_satisfied", "todo_not_satisfied"]: + """Check if the native tool execution satisfied the active todo. + + Similar to check_todo_completion but for native tool execution path. + """ + current_todo = self.state.todos.current_todo + + # DEBUG: Trace native todo completion check + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] check_native_todo_completion: current_todo={current_todo.step_number if current_todo else None}", + color="cyan", + ) + + if not current_todo: + # No active todo, continue with normal iteration + if self.agent.verbose: + self._printer.print( + content="[DEBUG] No current todo -> todo_not_satisfied", + color="cyan", + ) + return "todo_not_satisfied" + + # For native tools, any tool execution satisfies the todo + # The tool name matching is handled by native tool execution + if current_todo.tool_to_use: + # Check if any tool in the recent execution matched the expected tool + # For simplicity, any tool execution counts when there's a current todo + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] Native tool execution for todo {current_todo.step_number} -> todo_satisfied", + color="cyan", + ) + return "todo_satisfied" + + # Any tool use counts when no specific tool is required + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] Any native tool use counts for todo {current_todo.step_number} -> todo_satisfied", + color="cyan", + ) + return "todo_satisfied" @listen("initialized") def continue_iteration(self) -> Literal["check_iteration"]: @@ -949,14 +1316,208 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "continue_reasoning" @router(execute_tool_action) + def check_todo_completion( + self, + ) -> Literal["todo_satisfied", "todo_not_satisfied"]: + """Check if the current tool execution satisfied the active todo. + + After a tool is executed, this determines if the current todo + should be marked as complete based on whether: + 1. The expected tool was used (if specified) + 2. The agent returned a final answer for this step + """ + current_todo = self.state.todos.current_todo + + # DEBUG: Trace todo completion check + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] check_todo_completion: current_todo={current_todo.step_number if current_todo else None}, answer_type={type(self.state.current_answer).__name__}", + color="cyan", + ) + + if not current_todo: + # No active todo, continue with normal iteration + if self.agent.verbose: + self._printer.print( + content="[DEBUG] No current todo -> todo_not_satisfied", + color="cyan", + ) + return "todo_not_satisfied" + + action = self.state.current_answer + + # Check if the expected tool was used + if isinstance(action, AgentAction): + if current_todo.tool_to_use: + # Check if the tool used matches the expected tool + if action.tool == current_todo.tool_to_use: + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] Expected tool {current_todo.tool_to_use} matched -> todo_satisfied", + color="cyan", + ) + return "todo_satisfied" + else: + # No specific tool expected, any tool use counts + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] Any tool use counts (used {action.tool}) -> todo_satisfied", + color="cyan", + ) + return "todo_satisfied" + + # Check if we got a final answer for this step + if isinstance(action, AgentFinish): + if self.agent.verbose: + self._printer.print( + content="[DEBUG] AgentFinish received -> todo_satisfied", + color="cyan", + ) + return "todo_satisfied" + + if self.agent.verbose: + self._printer.print( + content="[DEBUG] No satisfaction condition met -> todo_not_satisfied", + color="cyan", + ) + return "todo_not_satisfied" + + @listen("todo_satisfied") + def mark_todo_complete(self) -> Literal["todo_marked"]: + """Mark the current todo as completed with its result.""" + current_todo = self.state.todos.current_todo + + # DEBUG: Trace marking todo complete + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] mark_todo_complete called: current_todo={current_todo.step_number if current_todo else None}", + color="cyan", + ) + + if not current_todo: + if self.agent.verbose: + self._printer.print( + content="[DEBUG] No current todo to mark -> todo_marked", + color="cyan", + ) + return "todo_marked" + + # Extract result from the current answer + result = "" + if isinstance(self.state.current_answer, AgentFinish): + result = str(self.state.current_answer.output) + elif isinstance(self.state.current_answer, AgentAction): + # Use the tool result (last message should have it) + if self.state.messages: + last_msg = self.state.messages[-1] + if ( + last_msg.get("role") == "tool" + or last_msg.get("role") == "assistant" + ): + result = str(last_msg.get("content", "")) + + self.state.todos.mark_completed(current_todo.step_number, result=result) + + if self.agent.verbose: + completed = self.state.todos.completed_count + total = len(self.state.todos.items) + self._printer.print( + content=f"✓ Todo {current_todo.step_number} completed ({completed}/{total})", + color="green", + ) + self._printer.print( + content=f"[DEBUG] Marked todo {current_todo.step_number} as completed, result_len={len(result)}", + color="cyan", + ) + + return "todo_marked" + + @router(mark_todo_complete) + def check_more_todos( + self, + ) -> Literal["has_todos", "all_todos_complete", "needs_replan"]: + """Check if there are more todos to execute after marking one complete. + + Also checks if replanning is needed based on execution results. + """ + # DEBUG: Trace checking for more todos + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] check_more_todos: is_complete={self.state.todos.is_complete}", + color="cyan", + ) + for todo in self.state.todos.items: + self._printer.print( + content=f"[DEBUG] Todo {todo.step_number}: status={todo.status}", + color="cyan", + ) + + # Check if replanning is needed before continuing + should_replan, reason = self._should_replan() + if should_replan: + self.state.last_replan_reason = reason + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] Replanning needed: {reason} -> needs_replan", + color="cyan", + ) + return "needs_replan" + + if self.state.todos.is_complete: + if self.agent.verbose: + self._printer.print( + content="[DEBUG] All todos complete -> all_todos_complete", + color="cyan", + ) + return "all_todos_complete" + + if self.agent.verbose: + self._printer.print( + content="[DEBUG] More todos to execute -> has_todos", + color="cyan", + ) + return "has_todos" + + @router("todo_not_satisfied") def increment_and_continue(self) -> Literal["initialized"]: - """Increment iteration counter and loop back for next iteration.""" + """Increment iteration counter and loop back for next iteration. + + Called when a tool execution didn't satisfy the current todo, + allowing the agent to continue working on it. + """ self.state.iterations += 1 return "initialized" - @listen(or_("agent_finished", "tool_result_is_final", "native_finished")) + @listen( + or_( + "all_todos_complete", + "agent_finished", + "tool_result_is_final", + "native_finished", + ) + ) def finalize(self) -> Literal["completed", "skipped"]: - """Finalize execution and emit completion logs.""" + """Finalize execution and emit completion logs. + + If todos were used, synthesizes a final answer from all todo results. + """ + # DEBUG: Trace finalize being called + if self.agent.verbose: + self._printer.print( + content=f"[DEBUG] finalize called! todos_count={len(self.state.todos.items)}, todos_complete={self.state.todos.is_complete}", + color="magenta", + ) + if self.state.todos.items: + for todo in self.state.todos.items: + self._printer.print( + content=f"[DEBUG] Todo {todo.step_number}: status={todo.status}, desc={todo.description[:40]}...", + color="magenta", + ) + + # If we have completed todos, synthesize the final answer + if self.state.todos.items and self.state.todos.is_complete: + self._synthesize_final_answer_from_todos() + if self.state.current_answer is None: skip_text = Text() skip_text.append("⚠️ ", style="yellow bold") @@ -982,7 +1543,239 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "completed" - @listen("parser_error") + def _synthesize_final_answer_from_todos(self) -> None: + """Combine all todo results into a final answer. + + Creates an AgentFinish from the accumulated results of all + completed todos. + """ + results: list[str] = [] + for todo in self.state.todos.items: + if todo.result: + results.append(f"**Step {todo.step_number}**: {todo.description}") + results.append(todo.result) + results.append("") # Empty line for spacing + + if results: + combined = "\n".join(results) + self.state.current_answer = AgentFinish( + thought="All planned steps completed successfully", + output=combined, + text=combined, + ) + + # ------------------------------------------------------------------------- + # Dynamic Replanning Methods + # ------------------------------------------------------------------------- + + def _should_replan(self) -> tuple[bool, str]: + """Determine if dynamic replanning is needed. + + Checks for conditions that warrant regenerating the execution plan: + 1. Multiple consecutive todo failures + 2. All todos completed but agent indicates incomplete results + 3. Agent explicitly requested a replan via tool or output + + Returns: + Tuple of (should_replan: bool, reason: str) + """ + max_replans = 3 # Maximum number of replanning attempts + + # Don't replan if we've hit the limit + if self.state.replan_count >= max_replans: + return False, "Max replan attempts reached" + + # Check for failed todos + failed_todos = [ + todo for todo in self.state.todos.items if todo.status == "failed" + ] + if len(failed_todos) >= 2: + return True, f"Multiple todos failed ({len(failed_todos)} failures)" + + # Check for todos with error results + error_todos = [ + todo + for todo in self.state.todos.items + if todo.result and todo.result.startswith("Error:") + ] + if len(error_todos) >= 2: + return ( + True, + f"Multiple todos encountered errors ({len(error_todos)} errors)", + ) + + # Check if agent's last message indicates need for replanning + if self.state.messages: + last_msg = self.state.messages[-1] + content = str(last_msg.get("content", "")).lower() + replan_indicators = [ + "need to reconsider", + "approach isn't working", + "try a different approach", + "replan", + "revise the plan", + "plan needs adjustment", + ] + for indicator in replan_indicators: + if indicator in content: + return True, f"Agent indicated replanning needed: '{indicator}'" + + return False, "" + + def _trigger_replan(self, reason: str) -> None: + """Trigger dynamic replanning with accumulated context. + + Regenerates the execution plan based on what has been learned + from previous attempts, including failures and partial results. + + Args: + reason: The reason for triggering the replan. + """ + self.state.replan_count += 1 + self.state.last_replan_reason = reason + + if self.agent.verbose: + self._printer.print( + content=f"Triggering replan (attempt {self.state.replan_count}): {reason}", + color="yellow", + ) + + # Build context from previous execution attempts + previous_context = self._build_replan_context() + + try: + from crewai.utilities.reasoning_handler import AgentReasoning + + if self.task: + planning_handler = AgentReasoning(agent=self.agent, task=self.task) + else: + input_text = getattr(self, "_kickoff_input", "") + planning_handler = AgentReasoning( + agent=self.agent, + description=input_text or "Complete the requested task", + expected_output="Complete the task successfully", + ) + + # Include previous context in the planning request + # This helps the planner learn from past failures + enhanced_description = self._enhance_task_for_replan(previous_context) + if self.task: + original_description = self.task.description + self.task.description = enhanced_description + output = planning_handler.handle_agent_reasoning() + self.task.description = original_description + else: + planning_handler.description = enhanced_description + output = planning_handler.handle_agent_reasoning() + + # Reset todos with new plan + self.state.plan = output.plan.plan + self.state.plan_ready = output.plan.ready + + if self.state.plan_ready and output.plan.steps: + self._create_todos_from_plan(output.plan.steps) + + if self.agent.verbose: + self._printer.print( + content=f"New plan created with {len(output.plan.steps)} steps", + color="green", + ) + + except Exception as e: + if hasattr(self.agent, "_logger"): + self.agent._logger.log("error", f"Error during replanning: {e!s}") + # Keep existing todos if replanning fails + self.state.last_replan_reason = f"Replan failed: {e!s}" + + def _build_replan_context(self) -> str: + """Build context from previous execution for replanning. + + Summarizes what has been attempted, what failed, and what succeeded + to help the planner create a better plan. + + Returns: + A context string describing previous execution state. + """ + context_parts = [] + + # Summarize completed todos + completed = [t for t in self.state.todos.items if t.status == "completed"] + if completed: + context_parts.append("Successfully completed steps:") + for todo in completed: + result_preview = ( + todo.result[:200] + "..." + if todo.result and len(todo.result) > 200 + else todo.result + ) + context_parts.append(f" - Step {todo.step_number}: {todo.description}") + if result_preview: + context_parts.append(f" Result: {result_preview}") + + # Summarize failed todos + failed = [ + t + for t in self.state.todos.items + if t.status == "failed" or (t.result and t.result.startswith("Error:")) + ] + if failed: + context_parts.append("\nFailed or errored steps:") + for todo in failed: + context_parts.append(f" - Step {todo.step_number}: {todo.description}") + if todo.result: + context_parts.append(f" Error: {todo.result}") + + # Add replan history + if self.state.replan_count > 0: + context_parts.append(f"\nThis is replan attempt {self.state.replan_count}.") + if self.state.last_replan_reason: + context_parts.append( + f"Previous replan reason: {self.state.last_replan_reason}" + ) + + return "\n".join(context_parts) + + def _enhance_task_for_replan(self, previous_context: str) -> str: + """Enhance task description with context for replanning. + + Args: + previous_context: Context from previous execution attempts. + + Returns: + Enhanced task description for the planner. + """ + original = ( + self.task.description if self.task else getattr(self, "_kickoff_input", "") + ) + + return f"""{original} + +IMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan +that accounts for the following context from the previous attempt: + +{previous_context} + +Consider: +1. What steps succeeded and can be built upon +2. What steps failed and why they might have failed +3. Alternative approaches that might work better +4. Whether dependencies need to be restructured""" + + @router("needs_replan") + def handle_replan(self) -> Literal["has_todos", "no_todos"]: + """Handle replanning request and return to todo execution. + + Called when dynamic replanning is triggered. Regenerates the plan + and routes back to todo-driven execution. + """ + reason = self.state.last_replan_reason or "Dynamic replan triggered" + self._trigger_replan(reason) + + if self.state.todos.items: + return "has_todos" + return "no_todos" + + @router("parser_error") def recover_from_parser_error(self) -> Literal["initialized"]: """Recover from output parser errors and retry.""" if not self._last_parser_error: @@ -1005,7 +1798,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): return "initialized" - @listen("context_error") + @router("context_error") def recover_from_context_length(self) -> Literal["initialized"]: """Recover from context length errors and retry.""" handle_context_length( @@ -1062,6 +1855,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.pending_tool_calls = [] self.state.plan = None self.state.plan_ready = False + self.state.todos = TodoList() + self.state.replan_count = 0 + self.state.last_replan_reason = None self._kickoff_input = inputs.get("input", "") @@ -1150,6 +1946,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.pending_tool_calls = [] self.state.plan = None self.state.plan_ready = False + self.state.todos = TodoList() + self.state.replan_count = 0 + self.state.last_replan_reason = None self._kickoff_input = inputs.get("input", "") diff --git a/lib/crewai/src/crewai/lite_agent_output.py b/lib/crewai/src/crewai/lite_agent_output.py index 4183dba1f..af0d51808 100644 --- a/lib/crewai/src/crewai/lite_agent_output.py +++ b/lib/crewai/src/crewai/lite_agent_output.py @@ -6,9 +6,27 @@ from typing import Any from pydantic import BaseModel, Field +from crewai.utilities.planning_types import TodoItem from crewai.utilities.types import LLMMessage +class TodoExecutionResult(BaseModel): + """Summary of a single todo execution.""" + + step_number: int = Field(description="Step number in the plan") + description: str = Field(description="What the todo was supposed to do") + tool_used: str | None = Field( + default=None, description="Tool that was used for this step" + ) + status: str = Field(description="Final status: completed, failed, pending") + result: str | None = Field( + default=None, description="Result or error message from execution" + ) + depends_on: list[int] = Field( + default_factory=list, description="Step numbers this depended on" + ) + + class LiteAgentOutput(BaseModel): """Class that represents the result of a LiteAgent execution.""" @@ -24,12 +42,75 @@ class LiteAgentOutput(BaseModel): ) messages: list[LLMMessage] = Field(description="Messages of the agent", default=[]) + plan: str | None = Field( + default=None, description="The execution plan that was generated, if any" + ) + todos: list[TodoExecutionResult] = Field( + default_factory=list, + description="List of todos that were executed with their results", + ) + replan_count: int = Field( + default=0, description="Number of times the plan was regenerated" + ) + last_replan_reason: str | None = Field( + default=None, description="Reason for the last replan, if any" + ) + + @classmethod + def from_todo_items(cls, todo_items: list[TodoItem]) -> list[TodoExecutionResult]: + """Convert TodoItem objects to TodoExecutionResult summaries. + + Args: + todo_items: List of TodoItem objects from execution. + + Returns: + List of TodoExecutionResult summaries. + """ + return [ + TodoExecutionResult( + step_number=item.step_number, + description=item.description, + tool_used=item.tool_to_use, + status=item.status, + result=item.result, + depends_on=item.depends_on, + ) + for item in todo_items + ] + def to_dict(self) -> dict[str, Any]: """Convert pydantic_output to a dictionary.""" if self.pydantic: return self.pydantic.model_dump() return {} + @property + def completed_todos(self) -> list[TodoExecutionResult]: + """Get only the completed todos.""" + return [t for t in self.todos if t.status == "completed"] + + @property + def failed_todos(self) -> list[TodoExecutionResult]: + """Get only the failed todos.""" + return [t for t in self.todos if t.status == "failed"] + + @property + def had_plan(self) -> bool: + """Check if the agent executed with a plan.""" + return self.plan is not None or len(self.todos) > 0 + def __str__(self) -> str: """Return the raw output as a string.""" return self.raw + + def __repr__(self) -> str: + """Return a detailed representation including todo summary.""" + parts = [f"LiteAgentOutput(role={self.agent_role!r}"] + if self.todos: + completed = len(self.completed_todos) + total = len(self.todos) + parts.append(f", todos={completed}/{total} completed") + if self.replan_count > 0: + parts.append(f", replans={self.replan_count}") + parts.append(")") + return "".join(parts) diff --git a/lib/crewai/src/crewai/utilities/planning_types.py b/lib/crewai/src/crewai/utilities/planning_types.py index bafd04453..69bc079cd 100644 --- a/lib/crewai/src/crewai/utilities/planning_types.py +++ b/lib/crewai/src/crewai/utilities/planning_types.py @@ -101,3 +101,46 @@ class TodoList(BaseModel): item.status = "completed" if result: item.result = result + + def _dependencies_satisfied(self, item: TodoItem) -> bool: + """Check if all dependencies for a todo item are completed. + + Args: + item: The todo item to check dependencies for. + + Returns: + True if all dependencies are completed, False otherwise. + """ + for dep_num in item.depends_on: + dep = self.get_by_step_number(dep_num) + if dep is None or dep.status != "completed": + return False + return True + + def get_ready_todos(self) -> list[TodoItem]: + """Get all todos that are ready to execute (pending with satisfied dependencies). + + Returns: + List of TodoItem objects that can be executed now. + """ + ready: list[TodoItem] = [] + for item in self.items: + if item.status != "pending": + continue + if self._dependencies_satisfied(item): + ready.append(item) + return ready + + @property + def can_parallelize(self) -> bool: + """Check if multiple todos can run in parallel. + + Returns: + True if more than one todo is ready to execute. + """ + return len(self.get_ready_todos()) > 1 + + @property + def running_count(self) -> int: + """Count of currently running todos.""" + return sum(1 for item in self.items if item.status == "running")