Its working but needs a massive clean up

This commit is contained in:
Brandon Hancock
2025-03-25 13:38:52 -04:00
parent 998afcd498
commit 06854fff86
7 changed files with 355 additions and 302 deletions

View File

@@ -4,7 +4,7 @@ import re
import uuid
from typing import Any, Callable, Dict, List, Optional, Type, Union, cast
from pydantic import BaseModel, Field, PrivateAttr, model_validator
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
@@ -12,17 +12,18 @@ from crewai.agents.cache import CacheHandler
from crewai.agents.parser import (
AgentAction,
AgentFinish,
CrewAgentParser,
OutputParserException,
)
from crewai.agents.tools_handler import ToolsHandler
from crewai.llm import LLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N
from crewai.utilities.agent_utils import (
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
get_tool_names,
handle_max_iterations_exceeded,
@@ -32,12 +33,16 @@ from crewai.utilities.agent_utils import (
render_text_description_and_args,
)
from crewai.utilities.events.agent_events import (
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
from crewai.utilities.events.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.printer import Printer
from crewai.utilities.token_counter_callback import TokenCalcHandler
@@ -103,7 +108,9 @@ class LiteAgent(BaseModel):
role: str = Field(description="Role of the agent")
goal: str = Field(description="Goal of the agent")
backstory: str = Field(description="Backstory of the agent")
llm: LLM = Field(description="Language model that will run the agent")
llm: Union[str, InstanceOf[LLM], Any] = Field(
description="Language model that will run the agent"
)
tools: List[BaseTool] = Field(
default_factory=list, description="Tools at agent's disposal"
)
@@ -119,15 +126,17 @@ class LiteAgent(BaseModel):
response_format: Optional[Type[BaseModel]] = Field(
default=None, description="Pydantic model for structured output"
)
step_callback: Optional[Any] = Field(
default=None,
description="Callback to be executed after each step of the agent execution.",
)
tools_results: List[Dict[str, Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
respect_context_window: bool = Field(
default=True,
description="Whether to respect the context window of the LLM",
)
callbacks: List[Callable] = Field(
default=[], description="Callbacks to be used for the agent"
)
_parsed_tools: List[CrewStructuredTool] = PrivateAttr(default_factory=list)
_token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess)
_cache_handler: CacheHandler = PrivateAttr(default_factory=CacheHandler)
_times_executed: int = PrivateAttr(default=0)
@@ -143,6 +152,7 @@ class LiteAgent(BaseModel):
_delegations: Dict[str, int] = PrivateAttr(default_factory=dict)
# Internationalization
_printer: Printer = PrivateAttr(default_factory=Printer)
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
request_within_rpm_limit: Optional[Callable[[], bool]] = Field(
default=None,
@@ -152,16 +162,31 @@ class LiteAgent(BaseModel):
default=True,
description="Whether to use stop words to prevent the LLM from using tools",
)
tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = Field(
default_factory=dict,
description="Mapping of tool names to tool instances",
)
@model_validator(mode="after")
def setup_llm(self):
"""Set up the LLM and other components after initialization."""
if self.llm is None:
raise ValueError("LLM must be provided")
self.llm = create_llm(self.llm)
if not isinstance(self.llm, LLM):
self.llm = create_llm(self.llm)
self.use_stop_words = self.llm.supports_stop_words()
raise ValueError("Unable to create LLM instance")
# Initialize callbacks
token_callback = TokenCalcHandler(token_cost_process=self._token_process)
self._callbacks = [token_callback]
return self
@model_validator(mode="after")
def parse_tools(self):
"""Parse the tools and convert them to CrewStructuredTool instances."""
self._parsed_tools = parse_tools(self.tools)
# Initialize tool name to tool mapping
self.tool_name_to_tool_map = {tool.name: tool for tool in self._parsed_tools}
return self
@@ -177,14 +202,14 @@ class LiteAgent(BaseModel):
def _get_default_system_prompt(self) -> str:
"""Get the default system prompt for the agent."""
if self.tools:
if self._parsed_tools:
# Use the prompt template for agents with tools
return self.i18n.slice("lite_agent_system_prompt_with_tools").format(
role=self.role,
backstory=self.backstory,
goal=self.goal,
tools=render_text_description_and_args(self.tools),
tool_names=get_tool_names(self.tools),
tools=render_text_description_and_args(self._parsed_tools),
tool_names=get_tool_names(self._parsed_tools),
)
else:
# Use the prompt template for agents without tools
@@ -211,102 +236,6 @@ class LiteAgent(BaseModel):
return formatted_messages
def _extract_structured_output(self, text: str) -> Optional[BaseModel]:
"""Extract structured output from text if response_format is set."""
if not self.response_format:
return None
try:
# Try to extract JSON from the text
json_match = re.search(r"```json\s*([\s\S]*?)\s*```", text)
if json_match:
json_str = json_match.group(1)
json_data = json.loads(json_str)
else:
# Try to parse the entire text as JSON
try:
json_data = json.loads(text)
except json.JSONDecodeError:
# If that fails, use a more lenient approach to find JSON-like content
potential_json = re.search(r"(\{[\s\S]*\})", text)
if potential_json:
json_data = json.loads(potential_json.group(1))
else:
return None
# Convert to Pydantic model
return self.response_format.model_validate(json_data)
except Exception as e:
if self.verbose:
print(f"Error extracting structured output: {e}")
return None
def _preprocess_model_output(self, text: str) -> str:
"""Preprocess the model output to correct common formatting issues."""
# Skip if the text is empty
if not text or text.strip() == "":
return "Thought: I need to provide an answer.\n\nFinal Answer: I don't have enough information to provide a complete answer."
# Remove 'Action' or 'Final Answer' from anywhere after a proper Thought
if "Thought:" in text and ("Action:" in text and "Final Answer:" in text):
# This is a case where both Action and Final Answer appear - clear conflict
# Check which one appears first and keep only that one
action_index = text.find("Action:")
final_answer_index = text.find("Final Answer:")
if action_index != -1 and final_answer_index != -1:
if action_index < final_answer_index:
# Keep only the Action part
text = text[:final_answer_index]
else:
# Keep only the Final Answer part
text = text[:action_index] + text[final_answer_index:]
if self.verbose:
print("Removed conflicting Action/Final Answer parts")
# Check if this looks like a tool usage attempt without proper formatting
if any(tool.name in text for tool in self.tools) and "Action:" not in text:
# Try to extract tool name and input
for tool in self.tools:
if tool.name in text:
# Find the tool name in the text
parts = text.split(tool.name, 1)
if len(parts) > 1:
# Try to extract input as JSON
input_text = parts[1]
json_match = re.search(r"(\{[\s\S]*\})", input_text)
if json_match:
# Construct a properly formatted response
formatted = "Thought: I need to use a tool to help with this task.\n\n"
formatted += f"Action: {tool.name}\n\n"
formatted += f"Action Input: {json_match.group(1)}\n"
if self.verbose:
print(f"Reformatted tool usage: {tool.name}")
return formatted
# Check if this looks like a final answer without proper formatting
if (
"Final Answer:" not in text
and not any(tool.name in text for tool in self.tools)
and "Action:" not in text
):
# This might be a direct response, format it as a final answer
# Don't format if text already has a "Thought:" section
if "Thought:" not in text:
formatted = "Thought: I can now provide the final answer.\n\n"
formatted += f"Final Answer: {text}\n"
if self.verbose:
print("Reformatted as final answer")
return formatted
return text
def kickoff(self, messages: Union[str, List[Dict[str, str]]]) -> LiteAgentOutput:
"""
Execute the agent with the given messages.
@@ -347,7 +276,7 @@ class LiteAgent(BaseModel):
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": self.tools,
"tools": self._parsed_tools,
"verbose": self.verbose,
}
@@ -356,7 +285,7 @@ class LiteAgent(BaseModel):
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=self.tools,
tools=self._parsed_tools,
messages=messages,
),
)
@@ -364,61 +293,29 @@ class LiteAgent(BaseModel):
try:
# Execute the agent using invoke loop
result = await self._invoke()
# Extract structured output if response_format is set
pydantic_output = None
if self.response_format:
structured_output = self._extract_structured_output(result)
if isinstance(structured_output, BaseModel):
pydantic_output = structured_output
# Create output object
usage_metrics = {}
if hasattr(self._token_process, "get_summary"):
usage_metrics_obj = self._token_process.get_summary()
if isinstance(usage_metrics_obj, UsageMetrics):
usage_metrics = usage_metrics_obj.model_dump()
output = LiteAgentOutput(
raw=result,
pydantic=pydantic_output,
agent_role=self.role,
usage_metrics=usage_metrics,
except AssertionError:
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
# Emit event for agent execution completion
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=result,
),
)
return output
raise
except Exception as e:
# Emit event for agent execution error
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
self._handle_unknown_error(e)
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
else:
raise e
# Retry if we haven't exceeded the retry limit
self._times_executed += 1
if self._times_executed <= self._max_retry_limit:
if self.verbose:
print(
f"Retrying agent execution ({self._times_executed}/{self._max_retry_limit})..."
)
return await self.kickoff_async(messages)
# TODO: CREATE AND RETURN LiteAgentOutput
return LiteAgentOutput(
raw=result.text,
pydantic=None, # TODO: Add pydantic output
agent_role=self.role,
usage_metrics=None, # TODO: Add usage metrics
)
raise e
async def _invoke(self) -> str:
async def _invoke(self) -> AgentFinish:
"""
Run the agent's thought process until it reaches a conclusion or max iterations.
Similar to _invoke_loop in CrewAgentExecutor.
@@ -426,18 +323,8 @@ class LiteAgent(BaseModel):
Returns:
str: The final result of the agent execution.
"""
# # Set up tools handler for tool execution
# tools_handler = ToolsHandler(cache=self._cache_handler)
# TODO: MOVE TO INIT
# Set up callbacks for token tracking
token_callback = TokenCalcHandler(token_cost_process=self._token_process)
callbacks = [token_callback]
# # Prepare tool configurations
# parsed_tools = parse_tools(self.tools)
# tools_description = render_text_description_and_args(parsed_tools)
# tools_names = get_tool_names(parsed_tools)
# Use the stored callbacks
callbacks = self._callbacks
# Execute the agent loop
formatted_answer = None
@@ -449,14 +336,14 @@ class LiteAgent(BaseModel):
printer=self._printer,
i18n=self.i18n,
messages=self._messages,
llm=self.llm,
llm=cast(LLM, self.llm),
callbacks=callbacks,
)
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=self.llm,
llm=cast(LLM, self.llm),
messages=self._messages,
callbacks=callbacks,
printer=self._printer,
@@ -471,11 +358,31 @@ class LiteAgent(BaseModel):
formatted_answer, tool_result
)
self._invoke_step_callback(formatted_answer)
self._append_message(formatted_answer.text, role="assistant")
except OutputParserException as e:
formatted_answer = self._handle_output_parser_exception(e)
except Exception as e:
print(f"Error: {e}")
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
if self._is_context_length_exceeded(e):
self._handle_context_length()
continue
else:
self._handle_unknown_error(e)
raise e
finally:
self._iterations += 1
# During the invoke loop, formatted_answer alternates between AgentAction
# (when the agent is using tools) and eventually becomes AgentFinish
# (when the agent reaches a final answer). This assertion confirms we've
# reached a final answer and helps type checking understand this transition.
assert isinstance(formatted_answer, AgentFinish)
self._show_logs(formatted_answer)
return formatted_answer
def _execute_tool_and_check_finality(self, agent_action: AgentAction) -> ToolResult:
try:
@@ -490,12 +397,12 @@ class LiteAgent(BaseModel):
),
)
tool_usage = ToolUsage(
tools=self.tools,
original_tools=self.tools, # TODO: INVESTIGATE DIFF BETWEEN THIS AND ABOVE
tools_description=render_text_description_and_args(self.tools),
tools_names=get_tool_names(self.tools),
agent=self,
tools=self._parsed_tools,
action=agent_action,
tools_handler=None,
task=None,
function_calling_llm=None,
)
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
@@ -504,9 +411,9 @@ class LiteAgent(BaseModel):
return ToolResult(result=tool_result, result_as_answer=False)
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in self.tool_name_to_tool_map
tool.name.casefold().strip() for tool in self._parsed_tools
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in self.tool_name_to_tool_map
tool.name.casefold().strip() for tool in self._parsed_tools
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool = self.tool_name_to_tool_map.get(tool_calling.tool_name)
@@ -515,23 +422,164 @@ class LiteAgent(BaseModel):
result=tool_result, result_as_answer=tool.result_as_answer
)
else:
tool_result = self._i18n.errors("wrong_tool_name").format(
tool_result = self.i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
tools=", ".join(
[tool.name.casefold() for tool in self._parsed_tools]
),
)
return ToolResult(result=tool_result, result_as_answer=False)
except Exception as e:
if self.agent:
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent( # validation error
agent_key=self.agent.key,
agent_role=self.agent.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
error=str(e),
),
)
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
agent_key=self.key,
agent_role=self.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
error=str(e),
),
)
raise e
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> Union[AgentAction, AgentFinish]:
"""Handle the AgentAction, execute tools, and process the results."""
formatted_answer.text += f"\nObservation: {tool_result.result}"
formatted_answer.result = tool_result.result
if tool_result.result_as_answer:
return AgentFinish(
thought="",
output=tool_result.result,
text=formatted_answer.text,
)
self._show_logs(formatted_answer)
return formatted_answer
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
if self.verbose:
agent_role = self.role.split("\n")[0]
if isinstance(formatted_answer, AgentAction):
thought = re.sub(r"\n+", "\n", formatted_answer.thought)
formatted_json = json.dumps(
formatted_answer.tool_input,
indent=2,
ensure_ascii=False,
)
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
if thought and thought != "":
self._printer.print(
content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m"
)
self._printer.print(
content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m"
)
elif isinstance(formatted_answer, AgentFinish):
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
self._printer.print(
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
self._messages.append(format_message_for_llm(text, role=role))
def _handle_output_parser_exception(self, e: OutputParserException) -> AgentAction:
"""Handle OutputParserException by updating messages and formatted_answer."""
self._messages.append({"role": "user", "content": e.error})
formatted_answer = AgentAction(
text=e.error,
tool="",
tool_input="",
thought="",
)
MAX_ITERATIONS = 3
if self._iterations > MAX_ITERATIONS:
self._printer.print(
content=f"Error parsing LLM output, agent will retry: {e.error}",
color="red",
)
return formatted_answer
def _is_context_length_exceeded(self, exception: Exception) -> bool:
"""Check if the exception is due to context length exceeding."""
return LLMContextLengthExceededException(
str(exception)
)._is_context_limit_error(str(exception))
def _handle_context_length(self) -> None:
if self.respect_context_window:
self._printer.print(
content="Context length exceeded. Summarizing content to fit the model context window.",
color="yellow",
)
self._summarize_messages()
else:
self._printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)
def _summarize_messages(self) -> None:
messages_groups = []
for message in self.messages:
content = message["content"]
cut_size = cast(LLM, self.llm).get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append(content[i : i + cut_size])
summarized_contents = []
for group in messages_groups:
summary = cast(LLM, self.llm).call(
[
format_message_for_llm(
self.i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
self.i18n.slice("summarize_instruction").format(group=group),
),
],
callbacks=self.callbacks,
)
summarized_contents.append(summary)
merged_summary = " ".join(str(content) for content in summarized_contents)
self.messages = [
format_message_for_llm(
self.i18n.slice("summary").format(merged_summary=merged_summary)
)
]
def _handle_unknown_error(self, exception: Exception) -> None:
"""Handle unknown errors by informing the user."""
self._printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
self._printer.print(
content=f"Error details: {exception}",
color="red",
)