mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
WIP: generated summary from documents split, could also create memgpt approach
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
|
||||
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
|
||||
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
|
||||
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
|
||||
|
||||
@@ -10,6 +10,10 @@ from langchain_core.exceptions import OutputParserException
|
||||
from langchain_core.tools import BaseTool
|
||||
from langchain_core.utils.input import get_color_mapping
|
||||
from pydantic import InstanceOf
|
||||
import tiktoken
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
from langchain.chains.summarize import load_summarize_chain
|
||||
from openai import BadRequestError
|
||||
|
||||
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
|
||||
from crewai.agents.tools_handler import ToolsHandler
|
||||
@@ -40,6 +44,8 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
|
||||
system_template: Optional[str] = None
|
||||
prompt_template: Optional[str] = None
|
||||
response_template: Optional[str] = None
|
||||
retry_summarize: bool = False
|
||||
retry_summarize_count: int = 2
|
||||
|
||||
def _call(
|
||||
self,
|
||||
@@ -120,138 +126,198 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
|
||||
|
||||
Override this to take control of how the agent makes and acts on choices.
|
||||
"""
|
||||
try:
|
||||
if self._should_force_answer():
|
||||
error = self._i18n.errors("force_final_answer")
|
||||
output = AgentAction("_Exception", error, error)
|
||||
self.have_forced_answer = True
|
||||
yield AgentStep(action=output, observation=error)
|
||||
return
|
||||
for attempt in range(self.retry_summarize_count):
|
||||
try:
|
||||
if self._should_force_answer():
|
||||
error = self._i18n.errors("force_final_answer")
|
||||
output = AgentAction("_Exception", error, error)
|
||||
self.have_forced_answer = True
|
||||
yield AgentStep(action=output, observation=error)
|
||||
return
|
||||
|
||||
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
|
||||
|
||||
# Call the LLM to see what to do.
|
||||
output = self.agent.plan( # type: ignore # Incompatible types in assignment (expression has type "AgentAction | AgentFinish | list[AgentAction]", variable has type "AgentAction")
|
||||
intermediate_steps,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**inputs,
|
||||
)
|
||||
|
||||
except OutputParserException as e:
|
||||
if isinstance(self.handle_parsing_errors, bool):
|
||||
raise_error = not self.handle_parsing_errors
|
||||
else:
|
||||
raise_error = False
|
||||
if raise_error:
|
||||
raise ValueError(
|
||||
"An output parsing error occurred. "
|
||||
"In order to pass this error back to the agent and have it try "
|
||||
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
|
||||
f"This is the error: {str(e)}"
|
||||
intermediate_steps = self._prepare_intermediate_steps(
|
||||
intermediate_steps
|
||||
)
|
||||
str(e)
|
||||
if isinstance(self.handle_parsing_errors, bool):
|
||||
if e.send_to_llm:
|
||||
observation = f"\n{str(e.observation)}"
|
||||
str(e.llm_output)
|
||||
else:
|
||||
observation = ""
|
||||
elif isinstance(self.handle_parsing_errors, str):
|
||||
observation = f"\n{self.handle_parsing_errors}"
|
||||
elif callable(self.handle_parsing_errors):
|
||||
observation = f"\n{self.handle_parsing_errors(e)}"
|
||||
else:
|
||||
raise ValueError("Got unexpected type of `handle_parsing_errors`")
|
||||
output = AgentAction("_Exception", observation, "")
|
||||
|
||||
if run_manager:
|
||||
run_manager.on_agent_action(output, color="green")
|
||||
|
||||
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
|
||||
observation = ExceptionTool().run(
|
||||
output.tool_input,
|
||||
verbose=False,
|
||||
color=None,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**tool_run_kwargs,
|
||||
)
|
||||
|
||||
if self._should_force_answer():
|
||||
error = self._i18n.errors("force_final_answer")
|
||||
output = AgentAction("_Exception", error, error)
|
||||
yield AgentStep(action=output, observation=error)
|
||||
return
|
||||
|
||||
yield AgentStep(action=output, observation=observation)
|
||||
return
|
||||
|
||||
# If the tool chosen is the finishing tool, then we end and return.
|
||||
if isinstance(output, AgentFinish):
|
||||
if self.should_ask_for_human_input:
|
||||
human_feedback = self._ask_human_input(output.return_values["output"])
|
||||
|
||||
if self.crew and self.crew._train:
|
||||
self._handle_crew_training_output(output, human_feedback)
|
||||
|
||||
# Making sure we only ask for it once, so disabling for the next thought loop
|
||||
self.should_ask_for_human_input = False
|
||||
action = AgentAction(
|
||||
tool="Human Input", tool_input=human_feedback, log=output.log
|
||||
)
|
||||
|
||||
yield AgentStep(
|
||||
action=action,
|
||||
observation=self._i18n.slice("human_feedback").format(
|
||||
human_feedback=human_feedback
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
else:
|
||||
if self.crew and self.crew._train:
|
||||
self._handle_crew_training_output(output)
|
||||
|
||||
yield output
|
||||
return
|
||||
|
||||
self._create_short_term_memory(output)
|
||||
|
||||
actions: List[AgentAction]
|
||||
actions = [output] if isinstance(output, AgentAction) else output
|
||||
yield from actions
|
||||
|
||||
for agent_action in actions:
|
||||
if run_manager:
|
||||
run_manager.on_agent_action(agent_action, color="green")
|
||||
|
||||
tool_usage = ToolUsage(
|
||||
tools_handler=self.tools_handler, # type: ignore # Argument "tools_handler" to "ToolUsage" has incompatible type "ToolsHandler | None"; expected "ToolsHandler"
|
||||
tools=self.tools, # type: ignore # Argument "tools" to "ToolUsage" has incompatible type "Sequence[BaseTool]"; expected "list[BaseTool]"
|
||||
original_tools=self.original_tools,
|
||||
tools_description=self.tools_description,
|
||||
tools_names=self.tools_names,
|
||||
function_calling_llm=self.function_calling_llm,
|
||||
task=self.task,
|
||||
agent=self.crew_agent,
|
||||
action=agent_action,
|
||||
)
|
||||
tool_calling = tool_usage.parse(agent_action.log)
|
||||
|
||||
if isinstance(tool_calling, ToolUsageErrorException):
|
||||
observation = tool_calling.message
|
||||
else:
|
||||
if tool_calling.tool_name.casefold().strip() in [
|
||||
name.casefold().strip() for name in name_to_tool_map
|
||||
] or tool_calling.tool_name.casefold().replace("_", " ") in [
|
||||
name.casefold().strip() for name in name_to_tool_map
|
||||
]:
|
||||
observation = tool_usage.use(tool_calling, agent_action.log)
|
||||
else:
|
||||
observation = self._i18n.errors("wrong_tool_name").format(
|
||||
tool=tool_calling.tool_name,
|
||||
tools=", ".join([tool.name.casefold() for tool in self.tools]),
|
||||
if self.retry_summarize:
|
||||
encoding = tiktoken.encoding_for_model(self.llm.model_name)
|
||||
original_token_count = len(
|
||||
encoding.encode(intermediate_steps[0][1])
|
||||
)
|
||||
yield AgentStep(action=agent_action, observation=observation)
|
||||
if original_token_count > 8000:
|
||||
print(
|
||||
"BEFORE AGENT PLAN TOKEN LENGTH",
|
||||
original_token_count,
|
||||
)
|
||||
text = intermediate_steps[0][1]
|
||||
|
||||
text_splitter = RecursiveCharacterTextSplitter(
|
||||
separators=["\n\n", "\n"],
|
||||
chunk_size=8000,
|
||||
chunk_overlap=500,
|
||||
)
|
||||
docs = text_splitter.create_documents([text])
|
||||
print("DOCS", docs)
|
||||
print("DOCS length", len(docs))
|
||||
breakpoint()
|
||||
# TODO: store to vector db - using memgpt like strategy
|
||||
summary_chain = load_summarize_chain(
|
||||
self.llm, chain_type="map_reduce", verbose=True
|
||||
)
|
||||
summary = summary_chain.run(docs)
|
||||
|
||||
print("SUMMARY:", summary)
|
||||
|
||||
intermediate_steps[0] = (intermediate_steps[0][0], summary)
|
||||
|
||||
# Call the LLM to see what to do.
|
||||
output = self.agent.plan( # type: ignore # Incompatible types in assignment (expression has type "AgentAction | AgentFinish | list[AgentAction]", variable has type "AgentAction")
|
||||
intermediate_steps,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**inputs,
|
||||
)
|
||||
|
||||
except OutputParserException as e:
|
||||
if isinstance(self.handle_parsing_errors, bool):
|
||||
raise_error = not self.handle_parsing_errors
|
||||
else:
|
||||
raise_error = False
|
||||
if raise_error:
|
||||
raise ValueError(
|
||||
"An output parsing error occurred. "
|
||||
"In order to pass this error back to the agent and have it try "
|
||||
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
|
||||
f"This is the error: {str(e)}"
|
||||
)
|
||||
str(e)
|
||||
if isinstance(self.handle_parsing_errors, bool):
|
||||
if e.send_to_llm:
|
||||
observation = f"\n{str(e.observation)}"
|
||||
str(e.llm_output)
|
||||
else:
|
||||
observation = ""
|
||||
elif isinstance(self.handle_parsing_errors, str):
|
||||
observation = f"\n{self.handle_parsing_errors}"
|
||||
elif callable(self.handle_parsing_errors):
|
||||
observation = f"\n{self.handle_parsing_errors(e)}"
|
||||
else:
|
||||
raise ValueError("Got unexpected type of `handle_parsing_errors`")
|
||||
output = AgentAction("_Exception", observation, "")
|
||||
|
||||
if run_manager:
|
||||
run_manager.on_agent_action(output, color="green")
|
||||
|
||||
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
|
||||
observation = ExceptionTool().run(
|
||||
output.tool_input,
|
||||
verbose=False,
|
||||
color=None,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**tool_run_kwargs,
|
||||
)
|
||||
|
||||
if self._should_force_answer():
|
||||
error = self._i18n.errors("force_final_answer")
|
||||
output = AgentAction("_Exception", error, error)
|
||||
yield AgentStep(action=output, observation=error)
|
||||
return
|
||||
|
||||
yield AgentStep(action=output, observation=observation)
|
||||
return
|
||||
|
||||
except BadRequestError as e:
|
||||
print("bad request string str(e)", str(e))
|
||||
if (
|
||||
"context_length_exceeded" in str(e)
|
||||
and attempt < self.retry_summarize_count - 1
|
||||
):
|
||||
print(
|
||||
f"Context length exceeded. Retrying with summarization (attempt {attempt + 1})..."
|
||||
)
|
||||
self.retry_summarize = True
|
||||
breakpoint()
|
||||
continue
|
||||
else:
|
||||
print("Error now raising occurred in _iter_next_step:", e)
|
||||
raise e
|
||||
|
||||
except Exception as e:
|
||||
print("Error occurred in _iter_next_step:", e)
|
||||
raise e
|
||||
|
||||
# If the tool chosen is the finishing tool, then we end and return.
|
||||
if isinstance(output, AgentFinish):
|
||||
if self.should_ask_for_human_input:
|
||||
human_feedback = self._ask_human_input(
|
||||
output.return_values["output"]
|
||||
)
|
||||
|
||||
if self.crew and self.crew._train:
|
||||
self._handle_crew_training_output(output, human_feedback)
|
||||
|
||||
# Making sure we only ask for it once, so disabling for the next thought loop
|
||||
self.should_ask_for_human_input = False
|
||||
action = AgentAction(
|
||||
tool="Human Input", tool_input=human_feedback, log=output.log
|
||||
)
|
||||
|
||||
yield AgentStep(
|
||||
action=action,
|
||||
observation=self._i18n.slice("human_feedback").format(
|
||||
human_feedback=human_feedback
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
else:
|
||||
if self.crew and self.crew._train:
|
||||
self._handle_crew_training_output(output)
|
||||
|
||||
yield output
|
||||
return
|
||||
|
||||
self._create_short_term_memory(output)
|
||||
|
||||
actions: List[AgentAction]
|
||||
actions = [output] if isinstance(output, AgentAction) else output
|
||||
yield from actions
|
||||
|
||||
for agent_action in actions:
|
||||
if run_manager:
|
||||
run_manager.on_agent_action(agent_action, color="green")
|
||||
|
||||
tool_usage = ToolUsage(
|
||||
tools_handler=self.tools_handler, # type: ignore # Argument "tools_handler" to "ToolUsage" has incompatible type "ToolsHandler | None"; expected "ToolsHandler"
|
||||
tools=self.tools, # type: ignore # Argument "tools" to "ToolUsage" has incompatible type "Sequence[BaseTool]"; expected "list[BaseTool]"
|
||||
original_tools=self.original_tools,
|
||||
tools_description=self.tools_description,
|
||||
tools_names=self.tools_names,
|
||||
function_calling_llm=self.function_calling_llm,
|
||||
task=self.task,
|
||||
agent=self.crew_agent,
|
||||
action=agent_action,
|
||||
)
|
||||
|
||||
# print("tool_usage", tool_usage)
|
||||
tool_calling = tool_usage.parse(agent_action.log)
|
||||
# print("tool_calling", tool_calling)
|
||||
|
||||
if isinstance(tool_calling, ToolUsageErrorException):
|
||||
observation = tool_calling.message
|
||||
else:
|
||||
if tool_calling.tool_name.casefold().strip() in [
|
||||
name.casefold().strip() for name in name_to_tool_map
|
||||
] or tool_calling.tool_name.casefold().replace("_", " ") in [
|
||||
name.casefold().strip() for name in name_to_tool_map
|
||||
]:
|
||||
observation = tool_usage.use(tool_calling, agent_action.log)
|
||||
else:
|
||||
observation = self._i18n.errors("wrong_tool_name").format(
|
||||
tool=tool_calling.tool_name,
|
||||
tools=", ".join(
|
||||
[tool.name.casefold() for tool in self.tools]
|
||||
),
|
||||
)
|
||||
yield AgentStep(action=agent_action, observation=observation)
|
||||
|
||||
def _handle_crew_training_output(
|
||||
self, output: AgentFinish, human_feedback: str | None = None
|
||||
|
||||
@@ -16,7 +16,7 @@ try:
|
||||
except ImportError:
|
||||
agentops = None
|
||||
|
||||
OPENAI_BIGGER_MODELS = ["gpt-4"]
|
||||
OPENAI_BIGGER_MODELS = ["gpt-4o"]
|
||||
|
||||
|
||||
class ToolUsageErrorException(Exception):
|
||||
|
||||
Reference in New Issue
Block a user