addressing crictical comments

This commit is contained in:
lorenzejay
2026-03-12 10:55:57 -07:00
parent cabe02c5a1
commit 5cb3a02b03
3 changed files with 115 additions and 104 deletions

View File

@@ -1099,124 +1099,135 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
@router("multiple_todos_ready")
async def execute_todos_parallel(self) -> Literal["parallel_todos_complete"]:
"""Execute multiple independent todos concurrently.
"""Execute multiple independent todos concurrently via StepExecutor.
When multiple todos have their dependencies satisfied, they can
run in parallel for efficiency.
Uses the same StepExecutor path as sequential execution so that
parallel steps get: multi-turn action loops, tool usage events,
security context, vision sentinel handling, and hooks.
After all steps complete, each result is observed sequentially
through PlannerObserver so the planning system stays informed.
"""
ready = self.state.todos.get_ready_todos()
# Mark all ready todos as running
for todo in ready:
self.state.todos.mark_running(todo.step_number)
# Execute each todo in parallel
tasks = [self._execute_single_todo_async(todo) for todo in ready]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Build context and executor for each todo, then run in parallel
async def _run_step(todo: TodoItem) -> tuple[TodoItem, object]:
step_executor = self._ensure_step_executor()
context = self._build_context_for_todo(todo)
result = await asyncio.to_thread(step_executor.execute, todo, context)
return todo, result
# Store results and mark completed/failed
for todo, result in zip(ready, results, strict=True):
if isinstance(result, Exception):
error_msg = f"Error: {result!s}"
self.state.todos.mark_failed(todo.step_number, result=error_msg)
if self.agent.verbose:
self._printer.print(
content=f"Todo {todo.step_number} failed: {error_msg}",
color="red",
)
gathered = await asyncio.gather(
*[_run_step(todo) for todo in ready],
return_exceptions=True,
)
# Process results: store on todos and log, then observe each
step_results: list[tuple[TodoItem, object]] = []
for item in gathered:
if isinstance(item, Exception):
# Find which todo this was for — mark first running todo as failed
for todo in ready:
if todo.status == "running":
error_msg = f"Error: {item!s}"
todo.result = error_msg
self.state.todos.mark_failed(
todo.step_number, result=error_msg
)
if self.agent.verbose:
self._printer.print(
content=f"Todo {todo.step_number} failed: {error_msg}",
color="red",
)
break
else:
self._mark_todo_as_completed(todo.step_number, str(result))
todo, result = item
todo.result = result.result
return "parallel_todos_complete"
async def _execute_single_todo_async(self, todo: TodoItem) -> str:
"""Execute a single todo item asynchronously.
Args:
todo: The todo item to execute.
Returns:
The result of executing the todo.
"""
# Build messages for this specific todo
messages: list[LLMMessage] = [
{"role": "system", "content": self._get_todo_system_prompt()},
]
# Inject context into messages for parallel execution (since history is empty)
if todo.depends_on:
dep_results = []
for dep_num in todo.depends_on:
dep = self.state.todos.get_by_step_number(dep_num)
if dep and dep.result:
dep_results.append(f"Step {dep_num} result: {dep.result}")
if dep_results:
messages.append(
self.state.execution_log.append(
{
"role": "system",
"content": "Context from previous steps:\n"
+ "\n".join(dep_results),
"type": "step_execution",
"step_number": todo.step_number,
"success": result.success,
"result_preview": result.result[:200]
if result.result
else "",
"error": result.error,
"tool_calls": result.tool_calls_made,
"execution_time": result.execution_time,
}
)
todo_prompt = self._build_todo_prompt(todo, include_dependencies=False)
messages.append({"role": "user", "content": todo_prompt})
if self.agent.verbose:
status = "success" if result.success else "failed"
self._printer.print(
content=(
f"[Execute] Step {todo.step_number} {status} "
f"({result.execution_time:.1f}s, "
f"{len(result.tool_calls_made)} tool calls)"
),
color="green" if result.success else "red",
)
step_results.append((todo, result))
# If the todo specifies a tool and we have native tool support
if todo.tool_to_use and self.state.use_native_tools:
try:
response = await asyncio.to_thread(
self.llm.call,
messages,
tools=self._openai_tools,
available_functions=self._available_functions,
# Observe each completed step sequentially (observation updates shared state)
effort = self._get_reasoning_effort()
observer = self._ensure_planner_observer()
for todo, _result in step_results:
all_completed = self.state.todos.get_completed_todos()
remaining = self.state.todos.get_pending_todos()
observation = observer.observe(
completed_step=todo,
result=todo.result or "",
all_completed=all_completed,
remaining_todos=remaining,
)
self.state.observations[todo.step_number] = observation
self.state.execution_log.append(
{
"type": "observation",
"step_number": todo.step_number,
"step_completed_successfully": observation.step_completed_successfully,
"key_information_learned": observation.key_information_learned,
"remaining_plan_still_valid": observation.remaining_plan_still_valid,
"needs_full_replan": observation.needs_full_replan,
"goal_already_achieved": observation.goal_already_achieved,
"reasoning_effort": effort,
}
)
# Mark based on observation result
if observation.step_completed_successfully:
self.state.todos.mark_completed(
todo.step_number, result=todo.result
)
else:
self.state.todos.mark_failed(
todo.step_number, result=todo.result
)
# Handle tool calls if returned
if isinstance(response, list) and response:
# Execute the tool call
tool_results = []
for tool_call in response:
info = extract_tool_call_info(tool_call)
if info:
_call_id, func_name, func_args = info
if func_name in self._available_functions:
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
tool_func = self._available_functions[func_name]
result = tool_func(**args_dict)
tool_results.append(str(result))
return "\n".join(tool_results) if tool_results else str(response)
if self.agent.verbose:
self._printer.print(
content=(
f"[Observe] Step {todo.step_number} "
f"(effort={effort}): "
f"success={observation.step_completed_successfully}, "
f"plan_valid={observation.remaining_plan_still_valid}, "
f"learned={observation.key_information_learned[:80]}..."
),
color="cyan",
)
return str(response)
except Exception as e:
return f"Tool execution error: {e!s}"
# Standard LLM call without tools
try:
response = await asyncio.to_thread(self.llm.call, messages)
return str(response)
except Exception as e:
return f"LLM call error: {e!s}"
def _get_todo_system_prompt(self) -> str:
"""Get the system prompt for todo execution.
Returns:
A system prompt for focused step execution.
"""
role = self.agent.role if self.agent else "Assistant"
goal = self.agent.goal if self.agent else "Complete tasks efficiently"
return self._i18n.retrieve("planning", "todo_system_prompt").format(
role=role,
goal=goal,
)
return "parallel_todos_complete"
@router("parallel_todos_complete")
def after_parallel_execution(

View File

@@ -264,6 +264,7 @@ class StepObservation(BaseModel):
if isinstance(v, dict):
return [v]
return v
needs_full_replan: bool = Field(
default=False,
description="The remaining plan is fundamentally wrong and must be regenerated",

View File

@@ -296,9 +296,10 @@ class AgentReasoning:
attempt = 1
max_attempts = self.config.max_attempts
task_id = str(self.task.id) if self.task else "kickoff"
current_attempt = attempt + 1
while not ready and (max_attempts is None or attempt < max_attempts):
attempt += 1
# Emit event for each refinement attempt
try:
crewai_event_bus.emit(
@@ -306,7 +307,7 @@ class AgentReasoning:
AgentReasoningStartedEvent(
agent_role=self.agent.role,
task_id=task_id,
attempt=current_attempt,
attempt=attempt,
from_task=self.task,
),
)
@@ -336,7 +337,7 @@ class AgentReasoning:
task_id=task_id,
plan=plan,
ready=ready,
attempt=current_attempt,
attempt=attempt,
from_task=self.task,
from_agent=self.agent,
),
@@ -344,8 +345,6 @@ class AgentReasoning:
except Exception: # noqa: S110
pass
attempt += 1
if max_attempts is not None and attempt >= max_attempts:
self.logger.warning(
f"Agent planning reached maximum attempts ({max_attempts}) "