diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 02ffd579a..8df3ed81b 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -1099,124 +1099,135 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): @router("multiple_todos_ready") async def execute_todos_parallel(self) -> Literal["parallel_todos_complete"]: - """Execute multiple independent todos concurrently. + """Execute multiple independent todos concurrently via StepExecutor. - When multiple todos have their dependencies satisfied, they can - run in parallel for efficiency. + Uses the same StepExecutor path as sequential execution so that + parallel steps get: multi-turn action loops, tool usage events, + security context, vision sentinel handling, and hooks. + + After all steps complete, each result is observed sequentially + through PlannerObserver so the planning system stays informed. """ + ready = self.state.todos.get_ready_todos() # Mark all ready todos as running for todo in ready: self.state.todos.mark_running(todo.step_number) - # Execute each todo in parallel - tasks = [self._execute_single_todo_async(todo) for todo in ready] - results = await asyncio.gather(*tasks, return_exceptions=True) + # Build context and executor for each todo, then run in parallel + async def _run_step(todo: TodoItem) -> tuple[TodoItem, object]: + step_executor = self._ensure_step_executor() + context = self._build_context_for_todo(todo) + result = await asyncio.to_thread(step_executor.execute, todo, context) + return todo, result - # Store results and mark completed/failed - for todo, result in zip(ready, results, strict=True): - if isinstance(result, Exception): - error_msg = f"Error: {result!s}" - self.state.todos.mark_failed(todo.step_number, result=error_msg) - if self.agent.verbose: - self._printer.print( - content=f"Todo {todo.step_number} failed: {error_msg}", - color="red", - ) + gathered = await asyncio.gather( + *[_run_step(todo) for todo in ready], + return_exceptions=True, + ) + + # Process results: store on todos and log, then observe each + step_results: list[tuple[TodoItem, object]] = [] + for item in gathered: + if isinstance(item, Exception): + # Find which todo this was for — mark first running todo as failed + for todo in ready: + if todo.status == "running": + error_msg = f"Error: {item!s}" + todo.result = error_msg + self.state.todos.mark_failed( + todo.step_number, result=error_msg + ) + if self.agent.verbose: + self._printer.print( + content=f"Todo {todo.step_number} failed: {error_msg}", + color="red", + ) + break else: - self._mark_todo_as_completed(todo.step_number, str(result)) + todo, result = item + todo.result = result.result - return "parallel_todos_complete" - - async def _execute_single_todo_async(self, todo: TodoItem) -> str: - """Execute a single todo item asynchronously. - - Args: - todo: The todo item to execute. - - Returns: - The result of executing the todo. - """ - # Build messages for this specific todo - messages: list[LLMMessage] = [ - {"role": "system", "content": self._get_todo_system_prompt()}, - ] - - # Inject context into messages for parallel execution (since history is empty) - if todo.depends_on: - dep_results = [] - for dep_num in todo.depends_on: - dep = self.state.todos.get_by_step_number(dep_num) - if dep and dep.result: - dep_results.append(f"Step {dep_num} result: {dep.result}") - if dep_results: - messages.append( + self.state.execution_log.append( { - "role": "system", - "content": "Context from previous steps:\n" - + "\n".join(dep_results), + "type": "step_execution", + "step_number": todo.step_number, + "success": result.success, + "result_preview": result.result[:200] + if result.result + else "", + "error": result.error, + "tool_calls": result.tool_calls_made, + "execution_time": result.execution_time, } ) - todo_prompt = self._build_todo_prompt(todo, include_dependencies=False) - messages.append({"role": "user", "content": todo_prompt}) + if self.agent.verbose: + status = "success" if result.success else "failed" + self._printer.print( + content=( + f"[Execute] Step {todo.step_number} {status} " + f"({result.execution_time:.1f}s, " + f"{len(result.tool_calls_made)} tool calls)" + ), + color="green" if result.success else "red", + ) + step_results.append((todo, result)) - # If the todo specifies a tool and we have native tool support - if todo.tool_to_use and self.state.use_native_tools: - try: - response = await asyncio.to_thread( - self.llm.call, - messages, - tools=self._openai_tools, - available_functions=self._available_functions, + # Observe each completed step sequentially (observation updates shared state) + effort = self._get_reasoning_effort() + observer = self._ensure_planner_observer() + + for todo, _result in step_results: + all_completed = self.state.todos.get_completed_todos() + remaining = self.state.todos.get_pending_todos() + + observation = observer.observe( + completed_step=todo, + result=todo.result or "", + all_completed=all_completed, + remaining_todos=remaining, + ) + + self.state.observations[todo.step_number] = observation + + self.state.execution_log.append( + { + "type": "observation", + "step_number": todo.step_number, + "step_completed_successfully": observation.step_completed_successfully, + "key_information_learned": observation.key_information_learned, + "remaining_plan_still_valid": observation.remaining_plan_still_valid, + "needs_full_replan": observation.needs_full_replan, + "goal_already_achieved": observation.goal_already_achieved, + "reasoning_effort": effort, + } + ) + + # Mark based on observation result + if observation.step_completed_successfully: + self.state.todos.mark_completed( + todo.step_number, result=todo.result + ) + else: + self.state.todos.mark_failed( + todo.step_number, result=todo.result ) - # Handle tool calls if returned - if isinstance(response, list) and response: - # Execute the tool call - tool_results = [] - for tool_call in response: - info = extract_tool_call_info(tool_call) - if info: - _call_id, func_name, func_args = info - if func_name in self._available_functions: - if isinstance(func_args, str): - try: - args_dict = json.loads(func_args) - except json.JSONDecodeError: - args_dict = {} - else: - args_dict = func_args - tool_func = self._available_functions[func_name] - result = tool_func(**args_dict) - tool_results.append(str(result)) - return "\n".join(tool_results) if tool_results else str(response) + if self.agent.verbose: + self._printer.print( + content=( + f"[Observe] Step {todo.step_number} " + f"(effort={effort}): " + f"success={observation.step_completed_successfully}, " + f"plan_valid={observation.remaining_plan_still_valid}, " + f"learned={observation.key_information_learned[:80]}..." + ), + color="cyan", + ) - return str(response) - except Exception as e: - return f"Tool execution error: {e!s}" - - # Standard LLM call without tools - try: - response = await asyncio.to_thread(self.llm.call, messages) - return str(response) - except Exception as e: - return f"LLM call error: {e!s}" - - def _get_todo_system_prompt(self) -> str: - """Get the system prompt for todo execution. - - Returns: - A system prompt for focused step execution. - """ - role = self.agent.role if self.agent else "Assistant" - goal = self.agent.goal if self.agent else "Complete tasks efficiently" - - return self._i18n.retrieve("planning", "todo_system_prompt").format( - role=role, - goal=goal, - ) + return "parallel_todos_complete" @router("parallel_todos_complete") def after_parallel_execution( diff --git a/lib/crewai/src/crewai/utilities/planning_types.py b/lib/crewai/src/crewai/utilities/planning_types.py index 5e75d00b5..e5c72ab24 100644 --- a/lib/crewai/src/crewai/utilities/planning_types.py +++ b/lib/crewai/src/crewai/utilities/planning_types.py @@ -264,6 +264,7 @@ class StepObservation(BaseModel): if isinstance(v, dict): return [v] return v + needs_full_replan: bool = Field( default=False, description="The remaining plan is fundamentally wrong and must be regenerated", diff --git a/lib/crewai/src/crewai/utilities/reasoning_handler.py b/lib/crewai/src/crewai/utilities/reasoning_handler.py index af3793c28..e0e6751f4 100644 --- a/lib/crewai/src/crewai/utilities/reasoning_handler.py +++ b/lib/crewai/src/crewai/utilities/reasoning_handler.py @@ -296,9 +296,10 @@ class AgentReasoning: attempt = 1 max_attempts = self.config.max_attempts task_id = str(self.task.id) if self.task else "kickoff" - current_attempt = attempt + 1 while not ready and (max_attempts is None or attempt < max_attempts): + attempt += 1 + # Emit event for each refinement attempt try: crewai_event_bus.emit( @@ -306,7 +307,7 @@ class AgentReasoning: AgentReasoningStartedEvent( agent_role=self.agent.role, task_id=task_id, - attempt=current_attempt, + attempt=attempt, from_task=self.task, ), ) @@ -336,7 +337,7 @@ class AgentReasoning: task_id=task_id, plan=plan, ready=ready, - attempt=current_attempt, + attempt=attempt, from_task=self.task, from_agent=self.agent, ), @@ -344,8 +345,6 @@ class AgentReasoning: except Exception: # noqa: S110 pass - attempt += 1 - if max_attempts is not None and attempt >= max_attempts: self.logger.warning( f"Agent planning reached maximum attempts ({max_attempts}) "