mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-02 20:58:30 +00:00
Previously, we only supported tools from the crewai-tools open-source repository. Now, we're introducing improved support for private tool repositories.
477 lines
15 KiB
Python
477 lines
15 KiB
Python
import json
|
|
import re
|
|
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
|
|
|
|
from crewai.agents.parser import (
|
|
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE,
|
|
AgentAction,
|
|
AgentFinish,
|
|
CrewAgentParser,
|
|
OutputParserException,
|
|
)
|
|
from crewai.llm import LLM
|
|
from crewai.llms.base_llm import BaseLLM
|
|
from crewai.tools import BaseTool as CrewAITool
|
|
from crewai.tools.base_tool import BaseTool
|
|
from crewai.tools.structured_tool import CrewStructuredTool
|
|
from crewai.tools.tool_types import ToolResult
|
|
from crewai.utilities import I18N, Printer
|
|
from crewai.utilities.errors import AgentRepositoryError
|
|
from crewai.utilities.exceptions.context_window_exceeding_exception import (
|
|
LLMContextLengthExceededException,
|
|
)
|
|
|
|
|
|
def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
|
|
"""Parse tools to be used for the task."""
|
|
tools_list = []
|
|
|
|
for tool in tools:
|
|
if isinstance(tool, CrewAITool):
|
|
tools_list.append(tool.to_structured_tool())
|
|
else:
|
|
raise ValueError("Tool is not a CrewStructuredTool or BaseTool")
|
|
|
|
return tools_list
|
|
|
|
|
|
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool]]) -> str:
|
|
"""Get the names of the tools."""
|
|
return ", ".join([t.name for t in tools])
|
|
|
|
|
|
def render_text_description_and_args(
|
|
tools: Sequence[Union[CrewStructuredTool, BaseTool]],
|
|
) -> str:
|
|
"""Render the tool name, description, and args in plain text.
|
|
|
|
search: This tool is used for search, args: {"query": {"type": "string"}}
|
|
calculator: This tool is used for math, \
|
|
args: {"expression": {"type": "string"}}
|
|
"""
|
|
tool_strings = []
|
|
for tool in tools:
|
|
tool_strings.append(tool.description)
|
|
|
|
return "\n".join(tool_strings)
|
|
|
|
|
|
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
|
|
"""Check if the maximum number of iterations has been reached."""
|
|
return iterations >= max_iterations
|
|
|
|
|
|
def handle_max_iterations_exceeded(
|
|
formatted_answer: Union[AgentAction, AgentFinish, None],
|
|
printer: Printer,
|
|
i18n: I18N,
|
|
messages: List[Dict[str, str]],
|
|
llm: Union[LLM, BaseLLM],
|
|
callbacks: List[Any],
|
|
) -> Union[AgentAction, AgentFinish]:
|
|
"""
|
|
Handles the case when the maximum number of iterations is exceeded.
|
|
Performs one more LLM call to get the final answer.
|
|
|
|
Parameters:
|
|
formatted_answer: The last formatted answer from the agent.
|
|
|
|
Returns:
|
|
The final formatted answer after exceeding max iterations.
|
|
"""
|
|
printer.print(
|
|
content="Maximum iterations reached. Requesting final answer.",
|
|
color="yellow",
|
|
)
|
|
|
|
if formatted_answer and hasattr(formatted_answer, "text"):
|
|
assistant_message = (
|
|
formatted_answer.text + f'\n{i18n.errors("force_final_answer")}'
|
|
)
|
|
else:
|
|
assistant_message = i18n.errors("force_final_answer")
|
|
|
|
messages.append(format_message_for_llm(assistant_message, role="assistant"))
|
|
|
|
# Perform one more LLM call to get the final answer
|
|
answer = llm.call(
|
|
messages,
|
|
callbacks=callbacks,
|
|
)
|
|
|
|
if answer is None or answer == "":
|
|
printer.print(
|
|
content="Received None or empty response from LLM call.",
|
|
color="red",
|
|
)
|
|
raise ValueError("Invalid response from LLM call - None or empty.")
|
|
|
|
formatted_answer = format_answer(answer)
|
|
# Return the formatted answer, regardless of its type
|
|
return formatted_answer
|
|
|
|
|
|
def format_message_for_llm(prompt: str, role: str = "user") -> Dict[str, str]:
|
|
prompt = prompt.rstrip()
|
|
return {"role": role, "content": prompt}
|
|
|
|
|
|
def format_answer(answer: str) -> Union[AgentAction, AgentFinish]:
|
|
"""Format a response from the LLM into an AgentAction or AgentFinish."""
|
|
try:
|
|
return CrewAgentParser.parse_text(answer)
|
|
except Exception:
|
|
# If parsing fails, return a default AgentFinish
|
|
return AgentFinish(
|
|
thought="Failed to parse LLM response",
|
|
output=answer,
|
|
text=answer,
|
|
)
|
|
|
|
|
|
def enforce_rpm_limit(
|
|
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
|
|
) -> None:
|
|
"""Enforce the requests per minute (RPM) limit if applicable."""
|
|
if request_within_rpm_limit:
|
|
request_within_rpm_limit()
|
|
|
|
|
|
def get_llm_response(
|
|
llm: Union[LLM, BaseLLM],
|
|
messages: List[Dict[str, str]],
|
|
callbacks: List[Any],
|
|
printer: Printer,
|
|
) -> str:
|
|
"""Call the LLM and return the response, handling any invalid responses."""
|
|
try:
|
|
answer = llm.call(
|
|
messages,
|
|
callbacks=callbacks,
|
|
)
|
|
except Exception as e:
|
|
printer.print(
|
|
content=f"Error during LLM call: {e}",
|
|
color="red",
|
|
)
|
|
raise e
|
|
if not answer:
|
|
printer.print(
|
|
content="Received None or empty response from LLM call.",
|
|
color="red",
|
|
)
|
|
raise ValueError("Invalid response from LLM call - None or empty.")
|
|
|
|
return answer
|
|
|
|
|
|
def process_llm_response(
|
|
answer: str, use_stop_words: bool
|
|
) -> Union[AgentAction, AgentFinish]:
|
|
"""Process the LLM response and format it into an AgentAction or AgentFinish."""
|
|
if not use_stop_words:
|
|
try:
|
|
# Preliminary parsing to check for errors.
|
|
format_answer(answer)
|
|
except OutputParserException as e:
|
|
if FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE in e.error:
|
|
answer = answer.split("Observation:")[0].strip()
|
|
|
|
return format_answer(answer)
|
|
|
|
|
|
def handle_agent_action_core(
|
|
formatted_answer: AgentAction,
|
|
tool_result: ToolResult,
|
|
messages: Optional[List[Dict[str, str]]] = None,
|
|
step_callback: Optional[Callable] = None,
|
|
show_logs: Optional[Callable] = None,
|
|
) -> Union[AgentAction, AgentFinish]:
|
|
"""Core logic for handling agent actions and tool results.
|
|
|
|
Args:
|
|
formatted_answer: The agent's action
|
|
tool_result: The result of executing the tool
|
|
messages: Optional list of messages to append results to
|
|
step_callback: Optional callback to execute after processing
|
|
show_logs: Optional function to show logs
|
|
|
|
Returns:
|
|
Either an AgentAction or AgentFinish
|
|
"""
|
|
if step_callback:
|
|
step_callback(tool_result)
|
|
|
|
formatted_answer.text += f"\nObservation: {tool_result.result}"
|
|
formatted_answer.result = tool_result.result
|
|
|
|
if tool_result.result_as_answer:
|
|
return AgentFinish(
|
|
thought="",
|
|
output=tool_result.result,
|
|
text=formatted_answer.text,
|
|
)
|
|
|
|
if show_logs:
|
|
show_logs(formatted_answer)
|
|
|
|
if messages is not None:
|
|
messages.append({"role": "assistant", "content": tool_result.result})
|
|
|
|
return formatted_answer
|
|
|
|
|
|
def handle_unknown_error(printer: Any, exception: Exception) -> None:
|
|
"""Handle unknown errors by informing the user.
|
|
|
|
Args:
|
|
printer: Printer instance for output
|
|
exception: The exception that occurred
|
|
"""
|
|
printer.print(
|
|
content="An unknown error occurred. Please check the details below.",
|
|
color="red",
|
|
)
|
|
printer.print(
|
|
content=f"Error details: {exception}",
|
|
color="red",
|
|
)
|
|
|
|
|
|
def handle_output_parser_exception(
|
|
e: OutputParserException,
|
|
messages: List[Dict[str, str]],
|
|
iterations: int,
|
|
log_error_after: int = 3,
|
|
printer: Optional[Any] = None,
|
|
) -> AgentAction:
|
|
"""Handle OutputParserException by updating messages and formatted_answer.
|
|
|
|
Args:
|
|
e: The OutputParserException that occurred
|
|
messages: List of messages to append to
|
|
iterations: Current iteration count
|
|
log_error_after: Number of iterations after which to log errors
|
|
printer: Optional printer instance for logging
|
|
|
|
Returns:
|
|
AgentAction: A formatted answer with the error
|
|
"""
|
|
messages.append({"role": "user", "content": e.error})
|
|
|
|
formatted_answer = AgentAction(
|
|
text=e.error,
|
|
tool="",
|
|
tool_input="",
|
|
thought="",
|
|
)
|
|
|
|
if iterations > log_error_after and printer:
|
|
printer.print(
|
|
content=f"Error parsing LLM output, agent will retry: {e.error}",
|
|
color="red",
|
|
)
|
|
|
|
return formatted_answer
|
|
|
|
|
|
def is_context_length_exceeded(exception: Exception) -> bool:
|
|
"""Check if the exception is due to context length exceeding.
|
|
|
|
Args:
|
|
exception: The exception to check
|
|
|
|
Returns:
|
|
bool: True if the exception is due to context length exceeding
|
|
"""
|
|
return LLMContextLengthExceededException(str(exception))._is_context_limit_error(
|
|
str(exception)
|
|
)
|
|
|
|
|
|
def handle_context_length(
|
|
respect_context_window: bool,
|
|
printer: Any,
|
|
messages: List[Dict[str, str]],
|
|
llm: Any,
|
|
callbacks: List[Any],
|
|
i18n: Any,
|
|
) -> None:
|
|
"""Handle context length exceeded by either summarizing or raising an error.
|
|
|
|
Args:
|
|
respect_context_window: Whether to respect context window
|
|
printer: Printer instance for output
|
|
messages: List of messages to summarize
|
|
llm: LLM instance for summarization
|
|
callbacks: List of callbacks for LLM
|
|
i18n: I18N instance for messages
|
|
"""
|
|
if respect_context_window:
|
|
printer.print(
|
|
content="Context length exceeded. Summarizing content to fit the model context window. Might take a while...",
|
|
color="yellow",
|
|
)
|
|
summarize_messages(messages, llm, callbacks, i18n)
|
|
else:
|
|
printer.print(
|
|
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
|
|
color="red",
|
|
)
|
|
raise SystemExit(
|
|
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
|
|
)
|
|
|
|
|
|
def summarize_messages(
|
|
messages: List[Dict[str, str]],
|
|
llm: Any,
|
|
callbacks: List[Any],
|
|
i18n: Any,
|
|
) -> None:
|
|
"""Summarize messages to fit within context window.
|
|
|
|
Args:
|
|
messages: List of messages to summarize
|
|
llm: LLM instance for summarization
|
|
callbacks: List of callbacks for LLM
|
|
i18n: I18N instance for messages
|
|
"""
|
|
messages_string = " ".join([message["content"] for message in messages])
|
|
messages_groups = []
|
|
|
|
cut_size = llm.get_context_window_size()
|
|
|
|
for i in range(0, len(messages_string), cut_size):
|
|
messages_groups.append({"content": messages_string[i : i + cut_size]})
|
|
|
|
summarized_contents = []
|
|
|
|
total_groups = len(messages_groups)
|
|
for idx, group in enumerate(messages_groups, 1):
|
|
Printer().print(
|
|
content=f"Summarizing {idx}/{total_groups}...",
|
|
color="yellow",
|
|
)
|
|
summary = llm.call(
|
|
[
|
|
format_message_for_llm(
|
|
i18n.slice("summarizer_system_message"), role="system"
|
|
),
|
|
format_message_for_llm(
|
|
i18n.slice("summarize_instruction").format(group=group["content"]),
|
|
),
|
|
],
|
|
callbacks=callbacks,
|
|
)
|
|
summarized_contents.append({"content": str(summary)})
|
|
|
|
merged_summary = " ".join(content["content"] for content in summarized_contents)
|
|
|
|
messages.clear()
|
|
messages.append(
|
|
format_message_for_llm(
|
|
i18n.slice("summary").format(merged_summary=merged_summary)
|
|
)
|
|
)
|
|
|
|
|
|
def show_agent_logs(
|
|
printer: Printer,
|
|
agent_role: str,
|
|
formatted_answer: Optional[Union[AgentAction, AgentFinish]] = None,
|
|
task_description: Optional[str] = None,
|
|
verbose: bool = False,
|
|
) -> None:
|
|
"""Show agent logs for both start and execution states.
|
|
|
|
Args:
|
|
printer: Printer instance for output
|
|
agent_role: Role of the agent
|
|
formatted_answer: Optional AgentAction or AgentFinish for execution logs
|
|
task_description: Optional task description for start logs
|
|
verbose: Whether to show verbose output
|
|
"""
|
|
if not verbose:
|
|
return
|
|
|
|
agent_role = agent_role.split("\n")[0]
|
|
|
|
if formatted_answer is None:
|
|
# Start logs
|
|
printer.print(
|
|
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
|
|
)
|
|
if task_description:
|
|
printer.print(
|
|
content=f"\033[95m## Task:\033[00m \033[92m{task_description}\033[00m"
|
|
)
|
|
else:
|
|
# Execution logs
|
|
printer.print(
|
|
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
|
|
)
|
|
|
|
if isinstance(formatted_answer, AgentAction):
|
|
thought = re.sub(r"\n+", "\n", formatted_answer.thought)
|
|
formatted_json = json.dumps(
|
|
formatted_answer.tool_input,
|
|
indent=2,
|
|
ensure_ascii=False,
|
|
)
|
|
if thought and thought != "":
|
|
printer.print(
|
|
content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m"
|
|
)
|
|
printer.print(
|
|
content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m"
|
|
)
|
|
printer.print(
|
|
content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m"
|
|
)
|
|
printer.print(
|
|
content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m"
|
|
)
|
|
elif isinstance(formatted_answer, AgentFinish):
|
|
printer.print(
|
|
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
|
|
)
|
|
|
|
|
|
def load_agent_from_repository(from_repository: str) -> Dict[str, Any]:
|
|
attributes: Dict[str, Any] = {}
|
|
if from_repository:
|
|
import importlib
|
|
|
|
from crewai.cli.authentication.token import get_auth_token
|
|
from crewai.cli.plus_api import PlusAPI
|
|
|
|
client = PlusAPI(api_key=get_auth_token())
|
|
response = client.get_agent(from_repository)
|
|
if response.status_code == 404:
|
|
raise AgentRepositoryError(
|
|
f"Agent {from_repository} does not exist, make sure the name is correct or the agent is available on your organization"
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise AgentRepositoryError(
|
|
f"Agent {from_repository} could not be loaded: {response.text}"
|
|
)
|
|
|
|
agent = response.json()
|
|
for key, value in agent.items():
|
|
if key == "tools":
|
|
attributes[key] = []
|
|
for tool in value:
|
|
try:
|
|
module = importlib.import_module(tool["module"])
|
|
tool_class = getattr(module, tool["name"])
|
|
attributes[key].append(tool_class())
|
|
except Exception as e:
|
|
raise AgentRepositoryError(
|
|
f"Tool {tool['name']} could not be loaded: {e}"
|
|
) from e
|
|
else:
|
|
attributes[key] = value
|
|
return attributes
|