Removing LangChain and Rebuilding Executor (#1322)

* rebuilding executor

* removing langchain

* Making all tests good

* fixing types and adding ability for nor using system prompts

* improving types

* pleasing the types gods

* pleasing the types gods

* fixing parser, tools and executor

* making sure all tests pass

* final pass

* fixing type

* Updating Docs

* preparing to cut new version
This commit is contained in:
João Moura
2024-09-16 14:14:04 -03:00
committed by GitHub
parent 38189fb555
commit ae0c84cea6
177 changed files with 27272 additions and 1618561 deletions

View File

@@ -1,6 +1,5 @@
from .cache.cache_handler import CacheHandler
from .executor import CrewAgentExecutor
from .parser import CrewAgentParser
from .tools_handler import ToolsHandler
__all__ = ["CacheHandler", "CrewAgentExecutor", "CrewAgentParser", "ToolsHandler"]
__all__ = ["CacheHandler", "CrewAgentParser", "ToolsHandler"]

View File

@@ -102,7 +102,8 @@ class BaseAgent(ABC, BaseModel):
description="Maximum number of requests per minute for the agent execution to be respected.",
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[Any]] = Field(
default_factory=list, description="Tools at agents' disposal"
@@ -224,10 +225,8 @@ class BaseAgent(ABC, BaseModel):
# Copy llm and clear callbacks
existing_llm = shallow_copy(self.llm)
existing_llm.callbacks = []
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = type(self)(**copied_data, llm=existing_llm, tools=self.tools)
return copied_agent

View File

@@ -19,15 +19,13 @@ class CrewAgentExecutorMixin:
crew_agent: Optional["BaseAgent"]
task: Optional["Task"]
iterations: int
force_answer_max_iterations: int
have_forced_answer: bool
max_iter: int
_i18n: I18N
def _should_force_answer(self) -> bool:
"""Determine if a forced answer is required based on iteration count."""
return (
self.iterations == self.force_answer_max_iterations
) and not self.have_forced_answer
return (self.iterations >= self.max_iter) and not self.have_forced_answer
def _create_short_term_memory(self, output) -> None:
"""Create and save a short-term memory item if conditions are met."""

View File

@@ -0,0 +1,352 @@
import json
import re
from typing import Any, Dict, List, Union
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.parser import CrewAgentParser
from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N, Printer
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.logger import Logger
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.llm import LLM
from crewai.agents.parser import (
AgentAction,
AgentFinish,
OutputParserException,
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE,
)
class CrewAgentExecutor(CrewAgentExecutorMixin):
_logger: Logger = Logger()
def __init__(
self,
llm: Any,
task: Any,
crew: Any,
agent: Any,
prompt: dict[str, str],
max_iter: int,
tools: List[Any],
tools_names: str,
use_stop_words: bool,
stop_words: List[str],
tools_description: str,
tools_handler: ToolsHandler,
step_callback: Any = None,
original_tools: List[Any] = [],
function_calling_llm: Any = None,
respect_context_window: bool = False,
request_within_rpm_limit: Any = None,
callbacks: List[Any] = [],
):
self._i18n: I18N = I18N()
self.llm = llm
self.task = task
self.agent = agent
self.crew = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
self.stop = stop_words
self.max_iter = max_iter
self.callbacks = callbacks
self._printer: Printer = Printer()
self.tools_handler = tools_handler
self.original_tools = original_tools
self.step_callback = step_callback
self.use_stop_words = use_stop_words
self.tools_description = tools_description
self.function_calling_llm = function_calling_llm
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.ask_for_human_input = False
self.messages: List[Dict[str, str]] = []
self.iterations = 0
self.have_forced_answer = False
self.name_to_tool_map = {tool.name: tool for tool in self.tools}
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
self.messages.append(self._format_msg(system_prompt, role="system"))
self.messages.append(self._format_msg(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(self._format_msg(user_prompt))
self._show_start_logs()
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
formatted_answer = self._invoke_loop()
if self.ask_for_human_input:
human_feedback = self._ask_human_input(formatted_answer.output)
if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer, human_feedback)
# Making sure we only ask for it once, so disabling for the next thought loop
self.ask_for_human_input = False
self.messages.append(self._format_msg(f"Feedback: {human_feedback}"))
formatted_answer = self._invoke_loop()
return {"output": formatted_answer.output}
def _invoke_loop(self, formatted_answer=None):
try:
while not isinstance(formatted_answer, AgentFinish):
if not self.request_within_rpm_limit or self.request_within_rpm_limit():
answer = LLM(
self.llm,
stop=self.stop if self.use_stop_words else None,
callbacks=self.callbacks,
).call(self.messages)
if not self.use_stop_words:
try:
self._format_answer(answer)
except OutputParserException as e:
if (
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE
in e.error
):
answer = answer.split("Observation:")[0].strip()
self.iterations += 1
formatted_answer = self._format_answer(answer)
if isinstance(formatted_answer, AgentAction):
action_result = self._use_tool(formatted_answer)
formatted_answer.text += f"\nObservation: {action_result}"
formatted_answer.result = action_result
self._show_logs(formatted_answer)
if self.step_callback:
self.step_callback(formatted_answer)
if self._should_force_answer():
if self.have_forced_answer:
return AgentFinish(
output=self._i18n.errors(
"force_final_answer_error"
).format(formatted_answer.text),
text=formatted_answer.text,
)
else:
formatted_answer.text += (
f'\n{self._i18n.errors("force_final_answer")}'
)
self.have_forced_answer = True
self.messages.append(
self._format_msg(formatted_answer.text, role="assistant")
)
except OutputParserException as e:
self.messages.append({"role": "assistant", "content": e.error})
return self._invoke_loop(formatted_answer)
except Exception as e:
if LLMContextLengthExceededException(str(e))._is_context_limit_error(
str(e)
):
self._handle_context_length()
return self._invoke_loop(formatted_answer)
else:
raise e
self._show_logs(formatted_answer)
return formatted_answer
def _show_start_logs(self):
if self.agent.verbose or (
hasattr(self, "crew") and getattr(self.crew, "verbose", False)
):
self._printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{self.agent.role}\033[00m"
)
self._printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{self.task.description}\033[00m"
)
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
if self.agent.verbose or (
hasattr(self, "crew") and getattr(self.crew, "verbose", False)
):
if isinstance(formatted_answer, AgentAction):
thought = re.sub(r"\n+", "\n", formatted_answer.thought)
formatted_json = json.dumps(
json.loads(formatted_answer.tool_input),
indent=2,
ensure_ascii=False,
)
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{self.agent.role}\033[00m"
)
if thought and thought != "":
self._printer.print(
content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m"
)
self._printer.print(
content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m"
)
elif isinstance(formatted_answer, AgentFinish):
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{self.agent.role}\033[00m"
)
self._printer.print(
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m"
)
def _use_tool(self, agent_action: AgentAction) -> Any:
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
original_tools=self.original_tools,
tools_description=self.tools_description,
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task, # type: ignore[arg-type]
agent=self.agent,
action=agent_action,
)
tool_calling = tool_usage.parse(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
tool_result = tool_calling.message
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in self.name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in self.name_to_tool_map
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
else:
tool_result = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
)
return tool_result
def _summarize_messages(self) -> None:
llm = LLM(self.llm)
messages_groups = []
for message in self.messages:
content = message["content"]
for i in range(0, len(content), 5000):
messages_groups.append(content[i : i + 5000])
summarized_contents = []
for group in messages_groups:
summary = llm.call(
[
self._format_msg(
self._i18n.slices("summarizer_system_message"), role="system"
),
self._format_msg(
self._i18n.errors("sumamrize_instruction").format(group=group),
),
]
)
summarized_contents.append(summary)
merged_summary = " ".join(str(content) for content in summarized_contents)
self.messages = [
self._format_msg(
self._i18n.errors("summary").format(merged_summary=merged_summary)
)
]
def _handle_context_length(self) -> None:
if self.respect_context_window:
self._logger.log(
"debug",
"Context length exceeded. Summarizing content to fit the model context window.",
color="yellow",
)
self._summarize_messages()
else:
self._logger.log(
"debug",
"Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)
def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: str | None = None
) -> None:
"""Function to handle the process of the training data."""
agent_id = str(self.agent.id)
if (
CrewTrainingHandler(TRAINING_DATA_FILE).load()
and not self.ask_for_human_input
):
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
if training_data.get(agent_id):
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
training_data[agent_id][self.crew._train_iteration][
"improved_output"
] = result.output
CrewTrainingHandler(TRAINING_DATA_FILE).save(training_data)
else:
self._logger.log(
"error",
"Invalid crew or missing _train_iteration attribute.",
color="red",
)
if self.ask_for_human_input and human_feedback is not None:
training_data = {
"initial_output": result.output,
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.agent.role,
}
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
train_iteration = self.crew._train_iteration
if isinstance(train_iteration, int):
CrewTrainingHandler(TRAINING_DATA_FILE).append(
train_iteration, agent_id, training_data
)
else:
self._logger.log(
"error",
"Invalid train iteration type. Expected int.",
color="red",
)
else:
self._logger.log(
"error",
"Crew is None or does not have _train_iteration attribute.",
color="red",
)
def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str:
prompt = prompt.replace("{input}", inputs["input"])
prompt = prompt.replace("{tool_names}", inputs["tool_names"])
prompt = prompt.replace("{tools}", inputs["tools"])
return prompt
def _format_answer(self, answer: str) -> Union[AgentAction, AgentFinish]:
return CrewAgentParser(agent=self.agent).parse(answer)
def _format_msg(self, prompt: str, role: str = "user") -> Dict[str, str]:
return {"role": role, "content": prompt}

View File

@@ -1,397 +0,0 @@
import threading
import time
from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union
import click
from langchain.agents import AgentExecutor
from langchain.agents.agent import ExceptionTool
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.exceptions import OutputParserException
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from pydantic import InstanceOf
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.logger import Logger
from crewai.utilities.training_handler import CrewTrainingHandler
class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
_i18n: I18N = I18N()
should_ask_for_human_input: bool = False
llm: Any = None
iterations: int = 0
task: Any = None
tools_description: str = ""
tools_names: str = ""
original_tools: List[Any] = []
crew_agent: Any = None
crew: Any = None
function_calling_llm: Any = None
request_within_rpm_limit: Any = None
tools_handler: Optional[InstanceOf[ToolsHandler]] = None
max_iterations: Optional[int] = 15
have_forced_answer: bool = False
force_answer_max_iterations: Optional[int] = None # type: ignore # Incompatible types in assignment (expression has type "int | None", base class "CrewAgentExecutorMixin" defined the type as "int")
step_callback: Optional[Any] = None
system_template: Optional[str] = None
prompt_template: Optional[str] = None
response_template: Optional[str] = None
_logger: Logger = Logger()
_fit_context_window_strategy: Optional[Literal["summarize"]] = "summarize"
def _call(
self,
inputs: Dict[str, str],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
"""Run text through and get agent response."""
# Construct a mapping of tool name to tool for easy lookup
name_to_tool_map = {tool.name: tool for tool in self.tools}
# We construct a mapping from each tool to a color, used for logging.
color_mapping = get_color_mapping(
[tool.name.casefold() for tool in self.tools],
excluded_colors=["green", "red"],
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Allowing human input given task setting
if self.task and self.task.human_input:
self.should_ask_for_human_input = True
# Let's start tracking the number of iterations and time elapsed
self.iterations = 0
time_elapsed = 0.0
start_time = time.time()
# We now enter the agent loop (until it returns something).
while self._should_continue(self.iterations, time_elapsed):
if not self.request_within_rpm_limit or self.request_within_rpm_limit():
next_step_output = self._take_next_step(
name_to_tool_map,
color_mapping,
inputs,
intermediate_steps,
run_manager=run_manager,
)
if self.step_callback:
self.step_callback(next_step_output)
if isinstance(next_step_output, AgentFinish):
# Creating long term memory
create_long_term_memory = threading.Thread(
target=self._create_long_term_memory, args=(next_step_output,)
)
create_long_term_memory.start()
return self._return(
next_step_output, intermediate_steps, run_manager=run_manager
)
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# See if tool should return directly
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
return self._return(
tool_return, intermediate_steps, run_manager=run_manager
)
self.iterations += 1
time_elapsed = time.time() - start_time
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
return self._return(output, intermediate_steps, run_manager=run_manager)
def _iter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""Take a single step in the thought-action-observation loop.
Override this to take control of how the agent makes and acts on choices.
"""
try:
if self._should_force_answer():
error = self._i18n.errors("force_final_answer")
output = AgentAction("_Exception", error, error)
self.have_forced_answer = True
yield AgentStep(action=output, observation=error)
return
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"An output parsing error occurred. "
"In order to pass this error back to the agent and have it try "
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
f"This is the error: {str(e)}"
)
str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = f"\n{str(e.observation)}"
str(e.llm_output)
else:
observation = ""
elif isinstance(self.handle_parsing_errors, str):
observation = f"\n{self.handle_parsing_errors}"
elif callable(self.handle_parsing_errors):
observation = f"\n{self.handle_parsing_errors(e)}"
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, "")
if run_manager:
run_manager.on_agent_action(output, color="green")
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = ExceptionTool().run(
output.tool_input,
verbose=False,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
if self._should_force_answer():
error = self._i18n.errors("force_final_answer")
output = AgentAction("_Exception", error, error)
yield AgentStep(action=output, observation=error)
return
yield AgentStep(action=output, observation=observation)
return
except Exception as e:
if LLMContextLengthExceededException(str(e))._is_context_limit_error(
str(e)
):
output = self._handle_context_length_error(
intermediate_steps, run_manager, inputs
)
if isinstance(output, AgentFinish):
yield output
elif isinstance(output, list):
for step in output:
yield step
return
raise e
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
if self.should_ask_for_human_input:
human_feedback = self._ask_human_input(output.return_values["output"])
if self.crew and self.crew._train:
self._handle_crew_training_output(output, human_feedback)
# Making sure we only ask for it once, so disabling for the next thought loop
self.should_ask_for_human_input = False
action = AgentAction(
tool="Human Input", tool_input=human_feedback, log=output.log
)
yield AgentStep(
action=action,
observation=self._i18n.slice("human_feedback").format(
human_feedback=human_feedback
),
)
return
else:
if self.crew and self.crew._train:
self._handle_crew_training_output(output)
yield output
return
self._create_short_term_memory(output)
actions: List[AgentAction]
actions = [output] if isinstance(output, AgentAction) else output
yield from actions
for agent_action in actions:
if run_manager:
run_manager.on_agent_action(agent_action, color="green")
tool_usage = ToolUsage(
tools_handler=self.tools_handler, # type: ignore # Argument "tools_handler" to "ToolUsage" has incompatible type "ToolsHandler | None"; expected "ToolsHandler"
tools=self.tools, # type: ignore # Argument "tools" to "ToolUsage" has incompatible type "Sequence[BaseTool]"; expected "list[BaseTool]"
original_tools=self.original_tools,
tools_description=self.tools_description,
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task,
agent=self.crew_agent,
action=agent_action,
)
tool_calling = tool_usage.parse(agent_action.log)
if isinstance(tool_calling, ToolUsageErrorException):
observation = tool_calling.message
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in name_to_tool_map
]:
observation = tool_usage.use(tool_calling, agent_action.log)
else:
observation = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
)
yield AgentStep(action=agent_action, observation=observation)
def _handle_crew_training_output(
self, output: AgentFinish, human_feedback: str | None = None
) -> None:
"""Function to handle the process of the training data."""
agent_id = str(self.crew_agent.id)
if (
CrewTrainingHandler(TRAINING_DATA_FILE).load()
and not self.should_ask_for_human_input
):
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
if training_data.get(agent_id):
training_data[agent_id][self.crew._train_iteration][
"improved_output"
] = output.return_values["output"]
CrewTrainingHandler(TRAINING_DATA_FILE).save(training_data)
if self.should_ask_for_human_input and human_feedback is not None:
training_data = {
"initial_output": output.return_values["output"],
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.crew_agent.role,
}
CrewTrainingHandler(TRAINING_DATA_FILE).append(
self.crew._train_iteration, agent_id, training_data
)
def _handle_context_length(
self, intermediate_steps: List[Tuple[AgentAction, str]]
) -> List[Tuple[AgentAction, str]]:
text = intermediate_steps[0][1]
original_action = intermediate_steps[0][0]
text_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n"],
chunk_size=8000,
chunk_overlap=500,
)
if self._fit_context_window_strategy == "summarize":
docs = text_splitter.create_documents([text])
self._logger.log(
"debug",
"Summarizing Content, it is recommended to use a RAG tool",
color="bold_blue",
)
summarize_chain = load_summarize_chain(
self.llm, chain_type="map_reduce", verbose=True
)
summarized_docs = []
for doc in docs:
summary = summarize_chain.invoke(
{"input_documents": [doc]}, return_only_outputs=True
)
summarized_docs.append(summary["output_text"])
formatted_results = "\n\n".join(summarized_docs)
summary_step = AgentStep(
action=AgentAction(
tool=original_action.tool,
tool_input=original_action.tool_input,
log=original_action.log,
),
observation=formatted_results,
)
summary_tuple = (summary_step.action, summary_step.observation)
return [summary_tuple]
return intermediate_steps
def _handle_context_length_error(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun],
inputs: Dict[str, str],
) -> Union[AgentFinish, List[AgentStep]]:
self._logger.log(
"debug",
"Context length exceeded. Asking user if they want to use summarize prompt to fit, this will reduce context length.",
color="yellow",
)
user_choice = click.confirm(
"Context length exceeded. Do you want to summarize the text to fit models context window?"
)
if user_choice:
self._logger.log(
"debug",
"Context length exceeded. Using summarize prompt to fit, this will reduce context length.",
color="bold_blue",
)
intermediate_steps = self._handle_context_length(intermediate_steps)
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
if isinstance(output, AgentFinish):
return output
elif isinstance(output, AgentAction):
return [AgentStep(action=output, observation=None)]
else:
return [AgentStep(action=action, observation=None) for action in output]
else:
self._logger.log(
"debug",
"Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)

View File

@@ -1,10 +1,6 @@
import re
from typing import Any, Union
from json_repair import repair_json
from langchain.agents.output_parsers import ReActSingleInputOutputParser
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.exceptions import OutputParserException
from crewai.utilities import I18N
@@ -14,7 +10,39 @@ MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE = "I did it wrong. Invalid Forma
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE = "I did it wrong. Tried to both perform Action and give a Final Answer at the same time, I must do one or the other"
class CrewAgentParser(ReActSingleInputOutputParser):
class AgentAction:
thought: str
tool: str
tool_input: str
text: str
result: str
def __init__(self, thought: str, tool: str, tool_input: str, text: str):
self.thought = thought
self.tool = tool
self.tool_input = tool_input
self.text = text
class AgentFinish:
thought: str
output: str
text: str
def __init__(self, thought: str, output: str, text: str):
self.thought = thought
self.output = output
self.text = text
class OutputParserException(Exception):
error: str
def __init__(self, error: str):
self.error = error
class CrewAgentParser:
"""Parses ReAct-style LLM calls that have a single tool input.
Expects output to be in one of two formats.
@@ -38,7 +66,11 @@ class CrewAgentParser(ReActSingleInputOutputParser):
_i18n: I18N = I18N()
agent: Any = None
def __init__(self, agent: Any):
self.agent = agent
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
thought = self._extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
regex = (
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
@@ -47,7 +79,7 @@ class CrewAgentParser(ReActSingleInputOutputParser):
if action_match:
if includes_answer:
raise OutputParserException(
f"{FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE}: {text}"
f"{FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE}"
)
action = action_match.group(1)
clean_action = self._clean_action(action)
@@ -57,30 +89,23 @@ class CrewAgentParser(ReActSingleInputOutputParser):
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = self._safe_repair_json(tool_input)
return AgentAction(clean_action, safe_tool_input, text)
return AgentAction(thought, clean_action, safe_tool_input, text)
elif includes_answer:
return AgentFinish(
{"output": text.split(FINAL_ANSWER_ACTION)[-1].strip()}, text
)
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
return AgentFinish(thought, final_answer, text)
if not re.search(r"Action\s*\d*\s*:[\s]*(.*?)", text, re.DOTALL):
self.agent.increment_formatting_errors()
raise OutputParserException(
f"Could not parse LLM output: `{text}`",
observation=f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{self._i18n.slice('final_answer_format')}",
llm_output=text,
send_to_llm=True,
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{self._i18n.slice('final_answer_format')}",
)
elif not re.search(
r"[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)", text, re.DOTALL
):
self.agent.increment_formatting_errors()
raise OutputParserException(
f"Could not parse LLM output: `{text}`",
observation=MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
llm_output=text,
send_to_llm=True,
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
format = self._i18n.slice("format_without_tools")
@@ -88,11 +113,15 @@ class CrewAgentParser(ReActSingleInputOutputParser):
self.agent.increment_formatting_errors()
raise OutputParserException(
error,
observation=error,
llm_output=text,
send_to_llm=True,
)
def _extract_thought(self, text: str) -> str:
regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)"
thought_match = re.search(regex, text, re.DOTALL)
if thought_match:
return thought_match.group(1).strip()
return ""
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""
return re.sub(r"^\s*\*+\s*|\s*\*+\s*$", "", text).strip()