revert crew agent executor

This commit is contained in:
lorenzejay
2026-01-22 09:28:00 -08:00
parent 85096ca086
commit bffe5aa877

View File

@@ -30,7 +30,6 @@ from crewai.hooks.llm_hooks import (
)
from crewai.utilities.agent_utils import (
aget_llm_response,
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -216,33 +215,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _invoke_loop(self) -> AgentFinish:
"""Execute agent loop until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return self._invoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return self._invoke_loop_react()
def _invoke_loop_react(self) -> AgentFinish:
"""Execute agent loop using ReAct text-based pattern.
This is the traditional approach where tool definitions are embedded
in the prompt and the LLM outputs Action/Action Input text that is
parsed to execute tools.
Returns:
Final answer from the agent.
"""
@@ -272,7 +244,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
response_model=self.response_model,
executor_context=self,
)
# breakpoint()
if self.response_model is not None:
try:
self.response_model.model_validate_json(answer)
@@ -362,315 +333,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _invoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern. The LLM directly returns
structured tool calls which are executed and results fed back.
Returns:
Final answer from the agent.
"""
# Convert tools to OpenAI schema format
if not self.original_tools:
# No tools available, fall back to simple LLM call
return self._invoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
def _invoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# OpenAI-style
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Anthropic-style
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Gemini-style
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
def _handle_native_tool_calls(
self,
tool_calls: list[Any],
available_functions: dict[str, Callable[..., Any]],
) -> None:
"""Handle a single native tool call from the LLM.
Executes only the FIRST tool call and appends the result to message history.
This enables sequential tool execution with reflection after each tool,
allowing the LLM to reason about results before deciding on next steps.
Args:
tool_calls: List of tool calls from the LLM (only first is processed).
available_functions: Dict mapping function names to callables.
"""
from datetime import datetime
import json
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
if not tool_calls:
return
# Only process the FIRST tool call for sequential execution with reflection
tool_call = tool_calls[0]
# Extract tool call info - handle OpenAI-style, Anthropic-style, and Gemini-style
if hasattr(tool_call, "function"):
# OpenAI-style: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini-style: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
result = "Tool not found"
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute the agent asynchronously with given inputs.
@@ -720,29 +382,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
async def _ainvoke_loop(self) -> AgentFinish:
"""Execute agent loop asynchronously until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return await self._ainvoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return await self._ainvoke_loop_react()
async def _ainvoke_loop_react(self) -> AgentFinish:
"""Execute agent loop asynchronously using ReAct text-based pattern.
Returns:
Final answer from the agent.
"""
@@ -856,135 +495,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
async def _ainvoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop asynchronously using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern.
Returns:
Final answer from the agent.
"""
# Convert tools to OpenAI schema format
if not self.original_tools:
return await self._ainvoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
async def _ainvoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple async LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish: