Feat/individual react agent (#2483)

* WIP

* WIP

* wip

* wip

* WIP

* More WIP

* Its working but needs a massive clean up

* output type works now

* Usage metrics fixed

* more testing

* WIP

* cleaning up

* Update logger

* 99% done. Need to make docs match new example

* cleanup

* drop hard coded examples

* docs

* Clean up

* Fix errors

* Trying to fix CI issues

* more type checker fixes

* More type checking fixes

* Update LiteAgent documentation for clarity and consistency; replace WebsiteSearchTool with SerperDevTool, and improve formatting in examples.

* fix fingerprinting issues

* fix type-checker

* Fix type-checker issue by adding type ignore comment for cache read in ToolUsage class

* Add optional agent parameter to CrewAgentParser and enhance action handling logic

* Remove unused parameters from ToolUsage instantiation in tests and clean up debug print statement in CrewAgentParser.

* Remove deprecated test files and examples for LiteAgent; add comprehensive tests for LiteAgent functionality, including tool usage and structured output handling.

* Remove unused variable 'result' from ToolUsage class to clean up code.

* Add initialization for 'result' variable in ToolUsage class to resolve type-checker warnings

* Refactor agent_utils.py by removing unused event imports and adding missing commas in function definitions. Update test_events.py to reflect changes in expected event counts and adjust assertions accordingly. Modify test_tools_emits_error_events.yaml to include new headers and update response content for consistency with recent API changes.

* Enhance tests in crew_test.py by verifying cache behavior in test_tools_with_custom_caching and ensuring proper agent initialization with added commas in test_crew_kickoff_for_each_works_with_manager_agent_copy.

* Update agent tests to reflect changes in expected call counts and improve response formatting in YAML cassette. Adjusted mock call count from 2 to 3 and refined interaction formats for clarity and consistency.

* Refactor agent tests to update model versions and improve response formatting in YAML cassettes. Changed model references from 'o1-preview' to 'o3-mini' and adjusted interaction formats for consistency. Enhanced error handling in context length tests and refined mock setups for better clarity.

* Update tool usage logging to ensure tool arguments are consistently formatted as strings. Adjust agent test cases to reflect changes in maximum iterations and expected outputs, enhancing clarity in assertions. Update YAML cassettes to align with new response formats and improve overall consistency across tests.

* Update YAML cassette for LLM tests to reflect changes in response structure and model version. Adjusted request and response headers, including updated content length and user agent. Enhanced token limits and request counts for improved testing accuracy.

* Update tool usage logging to store tool arguments as native types instead of strings, enhancing data integrity and usability.

* Refactor agent tests by removing outdated test cases and updating YAML cassettes to reflect changes in tool usage and response formats. Adjusted request and response headers, including user agent and content length, for improved accuracy in testing. Enhanced interaction formats for consistency across tests.

* Add Excalidraw diagram file for visual representation of input-output flow

Created a new Excalidraw file that includes a diagram illustrating the input box, database, and output box with connecting arrows. This visual aid enhances understanding of the data flow within the application.

* Remove redundant error handling for action and final answer in CrewAgentParser. Update tests to reflect this change by deleting the corresponding test case.

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
Co-authored-by: Lorenze Jay <lorenzejaytech@gmail.com>
This commit is contained in:
Brandon Hancock (bhancock_ai)
2025-04-02 11:54:46 -04:00
committed by GitHub
parent 9b51e1174c
commit efe27bd570
42 changed files with 21642 additions and 12531 deletions

View File

@@ -1,41 +1,40 @@
import json
import re
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Union
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.parser import (
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE,
AgentAction,
AgentFinish,
CrewAgentParser,
OutputParserException,
)
from crewai.agents.tools_handler import ToolsHandler
from crewai.llm import BaseLLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities import I18N, Printer
from crewai.utilities.agent_utils import (
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
handle_agent_action_core,
handle_context_length,
handle_max_iterations_exceeded,
handle_output_parser_exception,
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
process_llm_response,
show_agent_logs,
)
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events import (
ToolUsageErrorEvent,
crewai_event_bus,
)
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.logger import Logger
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
@dataclass
class ToolResult:
result: Any
result_as_answer: bool
class CrewAgentExecutor(CrewAgentExecutorMixin):
_logger: Logger = Logger()
@@ -47,7 +46,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
tools: List[BaseTool],
tools: List[CrewStructuredTool],
tools_names: str,
stop_words: List[str],
tools_description: str,
@@ -83,7 +82,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.messages: List[Dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, BaseTool] = {
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
existing_stop = self.llm.stop or []
@@ -99,11 +98,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
self.messages.append(self._format_msg(system_prompt, role="system"))
self.messages.append(self._format_msg(user_prompt))
self.messages.append(format_message_for_llm(system_prompt, role="system"))
self.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(self._format_msg(user_prompt))
self.messages.append(format_message_for_llm(user_prompt))
self._show_start_logs()
@@ -118,7 +117,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
raise
except Exception as e:
self._handle_unknown_error(e)
handle_unknown_error(self._printer, e)
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
@@ -140,16 +139,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer = None
while not isinstance(formatted_answer, AgentFinish):
try:
if self._has_reached_max_iterations():
formatted_answer = self._handle_max_iterations_exceeded(
formatted_answer
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
formatted_answer,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
break
self._enforce_rpm_limit()
enforce_rpm_limit(self.request_within_rpm_limit)
answer = self._get_llm_response()
formatted_answer = self._process_llm_response(answer)
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
)
formatted_answer = process_llm_response(answer, self.use_stop_words)
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
@@ -165,8 +173,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
}
tool_result = self._execute_tool_and_check_finality(
formatted_answer, fingerprint_context=fingerprint_context
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
fingerprint_context=fingerprint_context,
tools=self.tools,
i18n=self._i18n,
agent_key=self.agent.key if self.agent else None,
agent_role=self.agent.role if self.agent else None,
tools_handler=self.tools_handler,
task=self.task,
agent=self.agent,
function_calling_llm=self.function_calling_llm,
)
formatted_answer = self._handle_agent_action(
formatted_answer, tool_result
@@ -176,17 +193,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._append_message(formatted_answer.text, role="assistant")
except OutputParserException as e:
formatted_answer = self._handle_output_parser_exception(e)
formatted_answer = handle_output_parser_exception(
e=e,
messages=self.messages,
iterations=self.iterations,
log_error_after=self.log_error_after,
printer=self._printer,
)
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
if self._is_context_length_exceeded(e):
self._handle_context_length()
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
else:
self._handle_unknown_error(e)
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
@@ -199,89 +229,27 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _handle_unknown_error(self, exception: Exception) -> None:
"""Handle unknown errors by informing the user."""
self._printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
self._printer.print(
content=f"Error details: {exception}",
color="red",
)
def _has_reached_max_iterations(self) -> bool:
"""Check if the maximum number of iterations has been reached."""
return self.iterations >= self.max_iter
def _enforce_rpm_limit(self) -> None:
"""Enforce the requests per minute (RPM) limit if applicable."""
if self.request_within_rpm_limit:
self.request_within_rpm_limit()
def _get_llm_response(self) -> str:
"""Call the LLM and return the response, handling any invalid responses."""
try:
answer = self.llm.call(
self.messages,
callbacks=self.callbacks,
)
except Exception as e:
self._printer.print(
content=f"Error during LLM call: {e}",
color="red",
)
raise e
if not answer:
self._printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return answer
def _process_llm_response(self, answer: str) -> Union[AgentAction, AgentFinish]:
"""Process the LLM response and format it into an AgentAction or AgentFinish."""
if not self.use_stop_words:
try:
# Preliminary parsing to check for errors.
self._format_answer(answer)
except OutputParserException as e:
if FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE in e.error:
answer = answer.split("Observation:")[0].strip()
return self._format_answer(answer)
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> Union[AgentAction, AgentFinish]:
"""Handle the AgentAction, execute tools, and process the results."""
# Special case for add_image_tool
add_image_tool = self._i18n.tools("add_image")
if (
isinstance(add_image_tool, dict)
and formatted_answer.tool.casefold().strip()
== add_image_tool.get("name", "").casefold().strip()
):
self.messages.append(tool_result.result)
return formatted_answer # Continue the loop
self.messages.append({"role": "assistant", "content": tool_result.result})
return formatted_answer
if self.step_callback:
self.step_callback(tool_result)
formatted_answer.text += f"\nObservation: {tool_result.result}"
formatted_answer.result = tool_result.result
if tool_result.result_as_answer:
return AgentFinish(
thought="",
output=tool_result.result,
text=formatted_answer.text,
)
self._show_logs(formatted_answer)
return formatted_answer
return handle_agent_action_core(
formatted_answer=formatted_answer,
tool_result=tool_result,
messages=self.messages,
step_callback=self.step_callback,
show_logs=self._show_logs,
)
def _invoke_step_callback(self, formatted_answer) -> None:
"""Invoke the step callback if it exists."""
@@ -290,175 +258,33 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
self.messages.append(self._format_msg(text, role=role))
def _handle_output_parser_exception(self, e: OutputParserException) -> AgentAction:
"""Handle OutputParserException by updating messages and formatted_answer."""
self.messages.append({"role": "user", "content": e.error})
formatted_answer = AgentAction(
text=e.error,
tool="",
tool_input="",
thought="",
)
if self.iterations > self.log_error_after:
self._printer.print(
content=f"Error parsing LLM output, agent will retry: {e.error}",
color="red",
)
return formatted_answer
def _is_context_length_exceeded(self, exception: Exception) -> bool:
"""Check if the exception is due to context length exceeding."""
return LLMContextLengthExceededException(
str(exception)
)._is_context_limit_error(str(exception))
self.messages.append(format_message_for_llm(text, role=role))
def _show_start_logs(self):
"""Show logs for the start of agent execution."""
if self.agent is None:
raise ValueError("Agent cannot be None")
if self.agent.verbose or (
hasattr(self, "crew") and getattr(self.crew, "verbose", False)
):
agent_role = self.agent.role.split("\n")[0]
self._printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
description = (
show_agent_logs(
printer=self._printer,
agent_role=self.agent.role,
task_description=(
getattr(self.task, "description") if self.task else "Not Found"
)
self._printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
)
),
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
)
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
"""Show logs for the agent's execution."""
if self.agent is None:
raise ValueError("Agent cannot be None")
if self.agent.verbose or (
hasattr(self, "crew") and getattr(self.crew, "verbose", False)
):
agent_role = self.agent.role.split("\n")[0]
if isinstance(formatted_answer, AgentAction):
thought = re.sub(r"\n+", "\n", formatted_answer.thought)
formatted_json = json.dumps(
formatted_answer.tool_input,
indent=2,
ensure_ascii=False,
)
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
if thought and thought != "":
self._printer.print(
content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m"
)
self._printer.print(
content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m"
)
elif isinstance(formatted_answer, AgentFinish):
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
self._printer.print(
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)
def _execute_tool_and_check_finality(
self,
agent_action: AgentAction,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> ToolResult:
try:
fingerprint_context = fingerprint_context or {}
if self.agent:
# Create tool usage event with fingerprint information
event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
event_data.update(fingerprint_context)
# Emit the tool usage started event with agent information
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(**event_data),
)
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
original_tools=self.original_tools,
tools_description=self.tools_description,
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task, # type: ignore[arg-type]
agent=self.agent,
action=agent_action,
fingerprint_context=fingerprint_context, # Pass fingerprint context
)
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
tool_result = tool_calling.message
return ToolResult(result=tool_result, result_as_answer=False)
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in self.tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in self.tool_name_to_tool_map
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool = self.tool_name_to_tool_map.get(tool_calling.tool_name)
if tool:
return ToolResult(
result=tool_result, result_as_answer=tool.result_as_answer
)
else:
tool_result = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
)
return ToolResult(result=tool_result, result_as_answer=False)
except Exception as e:
# TODO: drop
if self.agent:
error_event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"error": str(e),
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
error_event_data.update(fingerprint_context)
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(**error_event_data),
)
raise e
show_agent_logs(
printer=self._printer,
agent_role=self.agent.role,
formatted_answer=formatted_answer,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
)
def _summarize_messages(self) -> None:
messages_groups = []
@@ -466,47 +292,33 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
content = message["content"]
cut_size = self.llm.get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append(content[i : i + cut_size])
messages_groups.append({"content": content[i : i + cut_size]})
summarized_contents = []
for group in messages_groups:
summary = self.llm.call(
[
self._format_msg(
format_message_for_llm(
self._i18n.slice("summarizer_system_message"), role="system"
),
self._format_msg(
self._i18n.slice("summarize_instruction").format(group=group),
format_message_for_llm(
self._i18n.slice("summarize_instruction").format(
group=group["content"]
),
),
],
callbacks=self.callbacks,
)
summarized_contents.append(summary)
summarized_contents.append({"content": str(summary)})
merged_summary = " ".join(str(content) for content in summarized_contents)
merged_summary = " ".join(content["content"] for content in summarized_contents)
self.messages = [
self._format_msg(
format_message_for_llm(
self._i18n.slice("summary").format(merged_summary=merged_summary)
)
]
def _handle_context_length(self) -> None:
if self.respect_context_window:
self._printer.print(
content="Context length exceeded. Summarizing content to fit the model context window.",
color="yellow",
)
self._summarize_messages()
else:
self._printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)
def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: Optional[str] = None
) -> None:
@@ -559,13 +371,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
prompt = prompt.replace("{tools}", inputs["tools"])
return prompt
def _format_answer(self, answer: str) -> Union[AgentAction, AgentFinish]:
return CrewAgentParser(agent=self.agent).parse(answer)
def _format_msg(self, prompt: str, role: str = "user") -> Dict[str, str]:
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Handle human feedback with different flows for training vs regular use.
@@ -592,7 +397,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"""Process feedback for training scenarios with single iteration."""
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(
self._format_msg(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
@@ -621,7 +426,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
self.messages.append(
self._format_msg(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
@@ -646,45 +451,3 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
),
color="red",
)
def _handle_max_iterations_exceeded(self, formatted_answer):
"""
Handles the case when the maximum number of iterations is exceeded.
Performs one more LLM call to get the final answer.
Parameters:
formatted_answer: The last formatted answer from the agent.
Returns:
The final formatted answer after exceeding max iterations.
"""
self._printer.print(
content="Maximum iterations reached. Requesting final answer.",
color="yellow",
)
if formatted_answer and hasattr(formatted_answer, "text"):
assistant_message = (
formatted_answer.text + f'\n{self._i18n.errors("force_final_answer")}'
)
else:
assistant_message = self._i18n.errors("force_final_answer")
self.messages.append(self._format_msg(assistant_message, role="assistant"))
# Perform one more LLM call to get the final answer
answer = self.llm.call(
self.messages,
callbacks=self.callbacks,
)
if answer is None or answer == "":
self._printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
formatted_answer = self._format_answer(answer)
# Return the formatted answer, regardless of its type
return formatted_answer