Merge branch 'main' into bug_fix

This commit is contained in:
Lucas Gomide
2025-04-09 09:43:28 -03:00
committed by GitHub
68 changed files with 25625 additions and 12627 deletions

View File

@@ -18,6 +18,11 @@ from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Converter, Prompts
from crewai.utilities.agent_utils import (
get_tool_names,
parse_tools,
render_text_description_and_args,
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import (
@@ -86,9 +91,6 @@ class Agent(BaseAgent):
response_template: Optional[str] = Field(
default=None, description="Response format for the agent."
)
tools_results: Optional[List[Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
allow_code_execution: Optional[bool] = Field(
default=False, description="Enable code execution for the agent."
)
@@ -205,6 +207,7 @@ class Agent(BaseAgent):
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._user_memory,
self.crew._external_memory,
)
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip() != "":
@@ -300,12 +303,12 @@ class Agent(BaseAgent):
Returns:
An instance of the CrewAgentExecutor class.
"""
tools = tools or self.tools or []
parsed_tools = self._parse_tools(tools)
raw_tools: List[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
prompt = Prompts(
agent=self,
tools=tools,
has_tools=len(raw_tools) > 0,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
@@ -327,12 +330,12 @@ class Agent(BaseAgent):
crew=self.crew,
tools=parsed_tools,
prompt=prompt,
original_tools=tools,
original_tools=raw_tools,
stop_words=stop_words,
max_iter=self.max_iter,
tools_handler=self.tools_handler,
tools_names=self.__tools_names(parsed_tools),
tools_description=self._render_text_description_and_args(parsed_tools),
tools_names=get_tool_names(parsed_tools),
tools_description=render_text_description_and_args(parsed_tools),
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
@@ -367,25 +370,6 @@ class Agent(BaseAgent):
def get_output_converter(self, llm, text, model, instructions):
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def _parse_tools(self, tools: List[Any]) -> List[Any]: # type: ignore
"""Parse tools to be used for the task."""
tools_list = []
try:
# tentatively try to import from crewai_tools import BaseTool as CrewAITool
from crewai.tools import BaseTool as CrewAITool
for tool in tools:
if isinstance(tool, CrewAITool):
tools_list.append(tool.to_structured_tool())
else:
tools_list.append(tool)
except ModuleNotFoundError:
tools_list = []
for tool in tools:
tools_list.append(tool)
return tools_list
def _training_handler(self, task_prompt: str) -> str:
"""Handle training data for the agent task prompt to improve output on Training."""
if data := CrewTrainingHandler(TRAINING_DATA_FILE).load():
@@ -431,23 +415,6 @@ class Agent(BaseAgent):
return description
def _render_text_description_and_args(self, tools: List[BaseTool]) -> str:
"""Render the tool name, description, and args in plain text.
Output will be in the format of:
.. code-block:: markdown
search: This tool is used for search, args: {"query": {"type": "string"}}
calculator: This tool is used for math, \
args: {"expression": {"type": "string"}}
"""
tool_strings = []
for tool in tools:
tool_strings.append(tool.description)
return "\n".join(tool_strings)
def _validate_docker_installation(self) -> None:
"""Check if Docker is installed and running."""
if not shutil.which("docker"):
@@ -467,10 +434,6 @@ class Agent(BaseAgent):
f"Docker is not running. Please start Docker to use code execution with agent: {self.role}"
)
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])
def __repr__(self):
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
@@ -483,3 +446,6 @@ class Agent(BaseAgent):
Fingerprint: The agent's fingerprint
"""
return self.security_config.fingerprint
def set_fingerprint(self, fingerprint: Fingerprint):
self.security_config.fingerprint = fingerprint

View File

@@ -2,7 +2,7 @@ import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Dict, List, Optional, TypeVar
from typing import Any, Callable, Dict, List, Optional, TypeVar
from pydantic import (
UUID4,
@@ -72,8 +72,6 @@ class BaseAgent(ABC, BaseModel):
Interpolate inputs into the agent description and backstory.
set_cache_handler(cache_handler: CacheHandler) -> None:
Set the cache handler for the agent.
increment_formatting_errors() -> None:
Increment formatting errors.
copy() -> "BaseAgent":
Create a copy of the agent.
set_rpm_controller(rpm_controller: RPMController) -> None:
@@ -91,9 +89,6 @@ class BaseAgent(ABC, BaseModel):
_original_backstory: Optional[str] = PrivateAttr(default=None)
_token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
formatting_errors: int = Field(
default=0, description="Number of formatting errors."
)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
@@ -135,6 +130,9 @@ class BaseAgent(ABC, BaseModel):
default_factory=ToolsHandler,
description="An instance of the ToolsHandler class.",
)
tools_results: List[Dict[str, Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
max_tokens: Optional[int] = Field(
default=None, description="Maximum number of tokens for the agent's execution."
)
@@ -153,6 +151,9 @@ class BaseAgent(ABC, BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the agent, including fingerprinting.",
)
callbacks: List[Callable] = Field(
default=[], description="Callbacks to be used for the agent"
)
@model_validator(mode="before")
@classmethod
@@ -254,10 +255,6 @@ class BaseAgent(ABC, BaseModel):
def create_agent_executor(self, tools=None) -> None:
pass
@abstractmethod
def _parse_tools(self, tools: List[BaseTool]) -> List[BaseTool]:
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
"""Set the task tools that init BaseAgenTools class."""
@@ -356,9 +353,6 @@ class BaseAgent(ABC, BaseModel):
self.tools_handler.cache = cache_handler
self.create_agent_executor()
def increment_formatting_errors(self) -> None:
self.formatting_errors += 1
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.

View File

@@ -1,5 +1,5 @@
import time
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@@ -15,9 +15,9 @@ if TYPE_CHECKING:
class CrewAgentExecutorMixin:
crew: Optional["Crew"]
agent: Optional["BaseAgent"]
task: Optional["Task"]
crew: "Crew"
agent: "BaseAgent"
task: "Task"
iterations: int
max_iter: int
_i18n: I18N
@@ -47,6 +47,27 @@ class CrewAgentExecutorMixin:
print(f"Failed to add to short term memory: {e}")
pass
def _create_external_memory(self, output) -> None:
"""Create and save a external-term memory item if conditions are met."""
if (
self.crew
and self.agent
and self.task
and hasattr(self.crew, "_external_memory")
and self.crew._external_memory
):
try:
self.crew._external_memory.save(
value=output.text,
metadata={
"description": self.task.description,
},
agent=self.agent.role,
)
except Exception as e:
print(f"Failed to add to external memory: {e}")
pass
def _create_long_term_memory(self, output) -> None:
"""Create and save long-term and entity memory items based on evaluation."""
if (

View File

@@ -1,41 +1,40 @@
import json
import re
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Union
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.parser import (
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE,
AgentAction,
AgentFinish,
CrewAgentParser,
OutputParserException,
)
from crewai.agents.tools_handler import ToolsHandler
from crewai.llm import BaseLLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities import I18N, Printer
from crewai.utilities.agent_utils import (
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
handle_agent_action_core,
handle_context_length,
handle_max_iterations_exceeded,
handle_output_parser_exception,
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
process_llm_response,
show_agent_logs,
)
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events import (
ToolUsageErrorEvent,
crewai_event_bus,
)
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.logger import Logger
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
@dataclass
class ToolResult:
result: Any
result_as_answer: bool
class CrewAgentExecutor(CrewAgentExecutorMixin):
_logger: Logger = Logger()
@@ -47,7 +46,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
tools: List[BaseTool],
tools: List[CrewStructuredTool],
tools_names: str,
stop_words: List[str],
tools_description: str,
@@ -83,7 +82,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.messages: List[Dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, BaseTool] = {
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
existing_stop = self.llm.stop or []
@@ -99,11 +98,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
self.messages.append(self._format_msg(system_prompt, role="system"))
self.messages.append(self._format_msg(user_prompt))
self.messages.append(format_message_for_llm(system_prompt, role="system"))
self.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(self._format_msg(user_prompt))
self.messages.append(format_message_for_llm(user_prompt))
self._show_start_logs()
@@ -118,7 +117,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
raise
except Exception as e:
self._handle_unknown_error(e)
handle_unknown_error(self._printer, e)
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
@@ -130,6 +129,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
def _invoke_loop(self) -> AgentFinish:
@@ -140,16 +140,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer = None
while not isinstance(formatted_answer, AgentFinish):
try:
if self._has_reached_max_iterations():
formatted_answer = self._handle_max_iterations_exceeded(
formatted_answer
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
formatted_answer,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
break
self._enforce_rpm_limit()
enforce_rpm_limit(self.request_within_rpm_limit)
answer = self._get_llm_response()
formatted_answer = self._process_llm_response(answer)
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
)
formatted_answer = process_llm_response(answer, self.use_stop_words)
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
@@ -165,8 +174,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
}
tool_result = self._execute_tool_and_check_finality(
formatted_answer, fingerprint_context=fingerprint_context
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
fingerprint_context=fingerprint_context,
tools=self.tools,
i18n=self._i18n,
agent_key=self.agent.key if self.agent else None,
agent_role=self.agent.role if self.agent else None,
tools_handler=self.tools_handler,
task=self.task,
agent=self.agent,
function_calling_llm=self.function_calling_llm,
)
formatted_answer = self._handle_agent_action(
formatted_answer, tool_result
@@ -176,17 +194,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._append_message(formatted_answer.text, role="assistant")
except OutputParserException as e:
formatted_answer = self._handle_output_parser_exception(e)
formatted_answer = handle_output_parser_exception(
e=e,
messages=self.messages,
iterations=self.iterations,
log_error_after=self.log_error_after,
printer=self._printer,
)
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
if self._is_context_length_exceeded(e):
self._handle_context_length()
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
else:
self._handle_unknown_error(e)
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
@@ -199,89 +230,27 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _handle_unknown_error(self, exception: Exception) -> None:
"""Handle unknown errors by informing the user."""
self._printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
self._printer.print(
content=f"Error details: {exception}",
color="red",
)
def _has_reached_max_iterations(self) -> bool:
"""Check if the maximum number of iterations has been reached."""
return self.iterations >= self.max_iter
def _enforce_rpm_limit(self) -> None:
"""Enforce the requests per minute (RPM) limit if applicable."""
if self.request_within_rpm_limit:
self.request_within_rpm_limit()
def _get_llm_response(self) -> str:
"""Call the LLM and return the response, handling any invalid responses."""
try:
answer = self.llm.call(
self.messages,
callbacks=self.callbacks,
)
except Exception as e:
self._printer.print(
content=f"Error during LLM call: {e}",
color="red",
)
raise e
if not answer:
self._printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return answer
def _process_llm_response(self, answer: str) -> Union[AgentAction, AgentFinish]:
"""Process the LLM response and format it into an AgentAction or AgentFinish."""
if not self.use_stop_words:
try:
# Preliminary parsing to check for errors.
self._format_answer(answer)
except OutputParserException as e:
if FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE in e.error:
answer = answer.split("Observation:")[0].strip()
return self._format_answer(answer)
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> Union[AgentAction, AgentFinish]:
"""Handle the AgentAction, execute tools, and process the results."""
# Special case for add_image_tool
add_image_tool = self._i18n.tools("add_image")
if (
isinstance(add_image_tool, dict)
and formatted_answer.tool.casefold().strip()
== add_image_tool.get("name", "").casefold().strip()
):
self.messages.append(tool_result.result)
return formatted_answer # Continue the loop
self.messages.append({"role": "assistant", "content": tool_result.result})
return formatted_answer
if self.step_callback:
self.step_callback(tool_result)
formatted_answer.text += f"\nObservation: {tool_result.result}"
formatted_answer.result = tool_result.result
if tool_result.result_as_answer:
return AgentFinish(
thought="",
output=tool_result.result,
text=formatted_answer.text,
)
self._show_logs(formatted_answer)
return formatted_answer
return handle_agent_action_core(
formatted_answer=formatted_answer,
tool_result=tool_result,
messages=self.messages,
step_callback=self.step_callback,
show_logs=self._show_logs,
)
def _invoke_step_callback(self, formatted_answer) -> None:
"""Invoke the step callback if it exists."""
@@ -290,175 +259,33 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
self.messages.append(self._format_msg(text, role=role))
def _handle_output_parser_exception(self, e: OutputParserException) -> AgentAction:
"""Handle OutputParserException by updating messages and formatted_answer."""
self.messages.append({"role": "user", "content": e.error})
formatted_answer = AgentAction(
text=e.error,
tool="",
tool_input="",
thought="",
)
if self.iterations > self.log_error_after:
self._printer.print(
content=f"Error parsing LLM output, agent will retry: {e.error}",
color="red",
)
return formatted_answer
def _is_context_length_exceeded(self, exception: Exception) -> bool:
"""Check if the exception is due to context length exceeding."""
return LLMContextLengthExceededException(
str(exception)
)._is_context_limit_error(str(exception))
self.messages.append(format_message_for_llm(text, role=role))
def _show_start_logs(self):
"""Show logs for the start of agent execution."""
if self.agent is None:
raise ValueError("Agent cannot be None")
if self.agent.verbose or (
hasattr(self, "crew") and getattr(self.crew, "verbose", False)
):
agent_role = self.agent.role.split("\n")[0]
self._printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
description = (
show_agent_logs(
printer=self._printer,
agent_role=self.agent.role,
task_description=(
getattr(self.task, "description") if self.task else "Not Found"
)
self._printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
)
),
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
)
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
"""Show logs for the agent's execution."""
if self.agent is None:
raise ValueError("Agent cannot be None")
if self.agent.verbose or (
hasattr(self, "crew") and getattr(self.crew, "verbose", False)
):
agent_role = self.agent.role.split("\n")[0]
if isinstance(formatted_answer, AgentAction):
thought = re.sub(r"\n+", "\n", formatted_answer.thought)
formatted_json = json.dumps(
formatted_answer.tool_input,
indent=2,
ensure_ascii=False,
)
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
if thought and thought != "":
self._printer.print(
content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m"
)
self._printer.print(
content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m"
)
self._printer.print(
content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m"
)
elif isinstance(formatted_answer, AgentFinish):
self._printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
self._printer.print(
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)
def _execute_tool_and_check_finality(
self,
agent_action: AgentAction,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> ToolResult:
try:
fingerprint_context = fingerprint_context or {}
if self.agent:
# Create tool usage event with fingerprint information
event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
event_data.update(fingerprint_context)
# Emit the tool usage started event with agent information
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(**event_data),
)
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
original_tools=self.original_tools,
tools_description=self.tools_description,
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task, # type: ignore[arg-type]
agent=self.agent,
action=agent_action,
fingerprint_context=fingerprint_context, # Pass fingerprint context
)
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
tool_result = tool_calling.message
return ToolResult(result=tool_result, result_as_answer=False)
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in self.tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in self.tool_name_to_tool_map
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool = self.tool_name_to_tool_map.get(tool_calling.tool_name)
if tool:
return ToolResult(
result=tool_result, result_as_answer=tool.result_as_answer
)
else:
tool_result = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
)
return ToolResult(result=tool_result, result_as_answer=False)
except Exception as e:
# TODO: drop
if self.agent:
error_event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"error": str(e),
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
error_event_data.update(fingerprint_context)
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(**error_event_data),
)
raise e
show_agent_logs(
printer=self._printer,
agent_role=self.agent.role,
formatted_answer=formatted_answer,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
)
def _summarize_messages(self) -> None:
messages_groups = []
@@ -466,47 +293,33 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
content = message["content"]
cut_size = self.llm.get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append(content[i : i + cut_size])
messages_groups.append({"content": content[i : i + cut_size]})
summarized_contents = []
for group in messages_groups:
summary = self.llm.call(
[
self._format_msg(
format_message_for_llm(
self._i18n.slice("summarizer_system_message"), role="system"
),
self._format_msg(
self._i18n.slice("summarize_instruction").format(group=group),
format_message_for_llm(
self._i18n.slice("summarize_instruction").format(
group=group["content"]
),
),
],
callbacks=self.callbacks,
)
summarized_contents.append(summary)
summarized_contents.append({"content": str(summary)})
merged_summary = " ".join(str(content) for content in summarized_contents)
merged_summary = " ".join(content["content"] for content in summarized_contents)
self.messages = [
self._format_msg(
format_message_for_llm(
self._i18n.slice("summary").format(merged_summary=merged_summary)
)
]
def _handle_context_length(self) -> None:
if self.respect_context_window:
self._printer.print(
content="Context length exceeded. Summarizing content to fit the model context window.",
color="yellow",
)
self._summarize_messages()
else:
self._printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)
def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: Optional[str] = None
) -> None:
@@ -559,13 +372,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
prompt = prompt.replace("{tools}", inputs["tools"])
return prompt
def _format_answer(self, answer: str) -> Union[AgentAction, AgentFinish]:
return CrewAgentParser(agent=self.agent).parse(answer)
def _format_msg(self, prompt: str, role: str = "user") -> Dict[str, str]:
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Handle human feedback with different flows for training vs regular use.
@@ -592,7 +398,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"""Process feedback for training scenarios with single iteration."""
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(
self._format_msg(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
@@ -621,7 +427,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
self.messages.append(
self._format_msg(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
@@ -646,45 +452,3 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
),
color="red",
)
def _handle_max_iterations_exceeded(self, formatted_answer):
"""
Handles the case when the maximum number of iterations is exceeded.
Performs one more LLM call to get the final answer.
Parameters:
formatted_answer: The last formatted answer from the agent.
Returns:
The final formatted answer after exceeding max iterations.
"""
self._printer.print(
content="Maximum iterations reached. Requesting final answer.",
color="yellow",
)
if formatted_answer and hasattr(formatted_answer, "text"):
assistant_message = (
formatted_answer.text + f'\n{self._i18n.errors("force_final_answer")}'
)
else:
assistant_message = self._i18n.errors("force_final_answer")
self.messages.append(self._format_msg(assistant_message, role="assistant"))
# Perform one more LLM call to get the final answer
answer = self.llm.call(
self.messages,
callbacks=self.callbacks,
)
if answer is None or answer == "":
self._printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
formatted_answer = self._format_answer(answer)
# Return the formatted answer, regardless of its type
return formatted_answer

View File

@@ -1,5 +1,5 @@
import re
from typing import Any, Union
from typing import Any, Optional, Union
from json_repair import repair_json
@@ -67,9 +67,23 @@ class CrewAgentParser:
_i18n: I18N = I18N()
agent: Any = None
def __init__(self, agent: Any):
def __init__(self, agent: Optional[Any] = None):
self.agent = agent
@staticmethod
def parse_text(text: str) -> Union[AgentAction, AgentFinish]:
"""
Static method to parse text into an AgentAction or AgentFinish without needing to instantiate the class.
Args:
text: The text to parse.
Returns:
Either an AgentAction or AgentFinish based on the parsed content.
"""
parser = CrewAgentParser()
return parser.parse(text)
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
thought = self._extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
@@ -77,22 +91,7 @@ class CrewAgentParser:
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
)
action_match = re.search(regex, text, re.DOTALL)
if action_match:
if includes_answer:
raise OutputParserException(
f"{FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE}"
)
action = action_match.group(1)
clean_action = self._clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = self._safe_repair_json(tool_input)
return AgentAction(thought, clean_action, safe_tool_input, text)
elif includes_answer:
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
# Check whether the final answer ends with triple backticks.
if final_answer.endswith("```"):
@@ -103,22 +102,30 @@ class CrewAgentParser:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought, final_answer, text)
elif action_match:
action = action_match.group(1)
clean_action = self._clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = self._safe_repair_json(tool_input)
return AgentAction(thought, clean_action, safe_tool_input, text)
if not re.search(r"Action\s*\d*\s*:[\s]*(.*?)", text, re.DOTALL):
self.agent.increment_formatting_errors()
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{self._i18n.slice('final_answer_format')}",
)
elif not re.search(
r"[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)", text, re.DOTALL
):
self.agent.increment_formatting_errors()
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
format = self._i18n.slice("format_without_tools")
error = f"{format}"
self.agent.increment_formatting_errors()
raise OutputParserException(
error,
)

View File

@@ -3,6 +3,10 @@ import subprocess
import click
# Be mindful about changing this.
# on some enviorments we don't use this command but instead uv sync directly
# so if you expect this to support more things you will need to replicate it there
# ask @joaomdmoura if you are unsure
def install_crew(proxy_options: list[str]) -> None:
"""
Install the crew by running the UV command to lock and install.

View File

@@ -28,6 +28,7 @@ from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM, BaseLLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.external.external_memory import ExternalMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.user.user_memory import UserMemory
@@ -105,6 +106,7 @@ class Crew(BaseModel):
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
_user_memory: Optional[InstanceOf[UserMemory]] = PrivateAttr()
_external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
_inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None)
@@ -145,6 +147,10 @@ class Crew(BaseModel):
default=None,
description="An instance of the UserMemory to be used by the Crew to store/fetch memories of a specific user.",
)
external_memory: Optional[InstanceOf[ExternalMemory]] = Field(
default=None,
description="An Instance of the ExternalMemory to be used by the Crew",
)
embedder: Optional[dict] = Field(
default=None,
description="Configuration for the embedder to be used for the crew.",
@@ -289,6 +295,12 @@ class Crew(BaseModel):
if self.entity_memory
else EntityMemory(crew=self, embedder_config=self.embedder)
)
self._external_memory = (
# External memory doesnt support a default value since it was designed to be managed entirely externally
self.external_memory.set_crew(self)
if self.external_memory
else None
)
if (
self.memory_config
and "user_memory" in self.memory_config
@@ -1167,6 +1179,7 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_external_memory",
"_telemetry",
"agents",
"tasks",
@@ -1313,7 +1326,15 @@ class Crew(BaseModel):
RuntimeError: If memory reset operation fails.
"""
VALID_TYPES = frozenset(
["long", "short", "entity", "knowledge", "kickoff_outputs", "all"]
[
"long",
"short",
"entity",
"knowledge",
"kickoff_outputs",
"all",
"external",
]
)
if command_type not in VALID_TYPES:
@@ -1339,6 +1360,7 @@ class Crew(BaseModel):
memory_systems = [
("short term", getattr(self, "_short_term_memory", None)),
("entity", getattr(self, "_entity_memory", None)),
("external", getattr(self, "_external_memory", None)),
("long term", getattr(self, "_long_term_memory", None)),
("task output", getattr(self, "_task_output_handler", None)),
("knowledge", getattr(self, "knowledge", None)),
@@ -1366,6 +1388,7 @@ class Crew(BaseModel):
"entity": (self._entity_memory, "entity"),
"knowledge": (self.knowledge, "knowledge"),
"kickoff_outputs": (self._task_output_handler, "task output"),
"external": (self._external_memory, "external"),
}
memory_system, name = reset_functions[memory_type]

518
src/crewai/lite_agent.py Normal file
View File

@@ -0,0 +1,518 @@
import asyncio
import json
import re
import uuid
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Type, Union, cast
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.cache import CacheHandler
from crewai.agents.parser import (
AgentAction,
AgentFinish,
OutputParserException,
)
from crewai.llm import LLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities import I18N
from crewai.utilities.agent_utils import (
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
get_tool_names,
handle_agent_action_core,
handle_context_length,
handle_max_iterations_exceeded,
handle_output_parser_exception,
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
parse_tools,
process_llm_response,
render_text_description_and_args,
show_agent_logs,
)
from crewai.utilities.converter import convert_to_model, generate_model_description
from crewai.utilities.events.agent_events import (
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.printer import Printer
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.tool_utils import execute_tool_and_check_finality
class LiteAgentOutput(BaseModel):
"""Class that represents the result of a LiteAgent execution."""
model_config = {"arbitrary_types_allowed": True}
raw: str = Field(description="Raw output of the agent", default="")
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of the agent", default=None
)
agent_role: str = Field(description="Role of the agent that produced this output")
usage_metrics: Optional[Dict[str, Any]] = Field(
description="Token usage metrics for this execution", default=None
)
def to_dict(self) -> Dict[str, Any]:
"""Convert pydantic_output to a dictionary."""
if self.pydantic:
return self.pydantic.model_dump()
return {}
def __str__(self) -> str:
"""String representation of the output."""
if self.pydantic:
return str(self.pydantic)
return self.raw
class LiteAgent(BaseModel):
"""
A lightweight agent that can process messages and use tools.
This agent is simpler than the full Agent class, focusing on direct execution
rather than task delegation. It's designed to be used for simple interactions
where a full crew is not needed.
Attributes:
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
llm: The language model that will run the agent.
tools: Tools at the agent's disposal.
verbose: Whether the agent execution should be in verbose mode.
max_iterations: Maximum number of iterations for tool usage.
max_execution_time: Maximum execution time in seconds.
response_format: Optional Pydantic model for structured output.
"""
model_config = {"arbitrary_types_allowed": True}
# Core Agent Properties
role: str = Field(description="Role of the agent")
goal: str = Field(description="Goal of the agent")
backstory: str = Field(description="Backstory of the agent")
llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
default=None, description="Language model that will run the agent"
)
tools: List[BaseTool] = Field(
default_factory=list, description="Tools at agent's disposal"
)
# Execution Control Properties
max_iterations: int = Field(
default=15, description="Maximum number of iterations for tool usage"
)
max_execution_time: Optional[int] = Field(
default=None, description="Maximum execution time in seconds"
)
respect_context_window: bool = Field(
default=True,
description="Whether to respect the context window of the LLM",
)
use_stop_words: bool = Field(
default=True,
description="Whether to use stop words to prevent the LLM from using tools",
)
request_within_rpm_limit: Optional[Callable[[], bool]] = Field(
default=None,
description="Callback to check if the request is within the RPM limit",
)
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
# Output and Formatting Properties
response_format: Optional[Type[BaseModel]] = Field(
default=None, description="Pydantic model for structured output"
)
verbose: bool = Field(
default=False, description="Whether to print execution details"
)
callbacks: List[Callable] = Field(
default=[], description="Callbacks to be used for the agent"
)
# State and Results
tools_results: List[Dict[str, Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
# Private Attributes
_parsed_tools: List[CrewStructuredTool] = PrivateAttr(default_factory=list)
_token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess)
_cache_handler: CacheHandler = PrivateAttr(default_factory=CacheHandler)
_key: str = PrivateAttr(default_factory=lambda: str(uuid.uuid4()))
_messages: List[Dict[str, str]] = PrivateAttr(default_factory=list)
_iterations: int = PrivateAttr(default=0)
_printer: Printer = PrivateAttr(default_factory=Printer)
@model_validator(mode="after")
def setup_llm(self):
"""Set up the LLM and other components after initialization."""
self.llm = create_llm(self.llm)
if not isinstance(self.llm, LLM):
raise ValueError("Unable to create LLM instance")
# Initialize callbacks
token_callback = TokenCalcHandler(token_cost_process=self._token_process)
self._callbacks = [token_callback]
return self
@model_validator(mode="after")
def parse_tools(self):
"""Parse the tools and convert them to CrewStructuredTool instances."""
self._parsed_tools = parse_tools(self.tools)
return self
@property
def key(self) -> str:
"""Get the unique key for this agent instance."""
return self._key
@property
def _original_role(self) -> str:
"""Return the original role for compatibility with tool interfaces."""
return self.role
def kickoff(self, messages: Union[str, List[Dict[str, str]]]) -> LiteAgentOutput:
"""
Execute the agent with the given messages.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
# Create agent info for event emission
agent_info = {
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": self._parsed_tools,
"verbose": self.verbose,
}
try:
# Reset state for this run
self._iterations = 0
self.tools_results = []
# Format messages for the LLM
self._messages = self._format_messages(messages)
# Emit event for agent execution start
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=self._parsed_tools,
messages=messages,
),
)
# Execute the agent using invoke loop
agent_finish = self._invoke_loop()
formatted_result: Optional[BaseModel] = None
if self.response_format:
try:
# Cast to BaseModel to ensure type safety
result = self.response_format.model_validate_json(
agent_finish.output
)
if isinstance(result, BaseModel):
formatted_result = result
except Exception as e:
self._printer.print(
content=f"Failed to parse output into response format: {str(e)}",
color="yellow",
)
# Calculate token usage metrics
usage_metrics = self._token_process.get_summary()
# Create output
output = LiteAgentOutput(
raw=agent_finish.output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
)
# Emit completion event
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=agent_finish.output,
),
)
return output
except Exception as e:
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
handle_unknown_error(self._printer, e)
# Emit error event
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise e
async def kickoff_async(
self, messages: Union[str, List[Dict[str, str]]]
) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
Returns:
LiteAgentOutput: The result of the agent execution.
"""
return await asyncio.to_thread(self.kickoff, messages)
def _get_default_system_prompt(self) -> str:
"""Get the default system prompt for the agent."""
base_prompt = ""
if self._parsed_tools:
# Use the prompt template for agents with tools
base_prompt = self.i18n.slice("lite_agent_system_prompt_with_tools").format(
role=self.role,
backstory=self.backstory,
goal=self.goal,
tools=render_text_description_and_args(self._parsed_tools),
tool_names=get_tool_names(self._parsed_tools),
)
else:
# Use the prompt template for agents without tools
base_prompt = self.i18n.slice(
"lite_agent_system_prompt_without_tools"
).format(
role=self.role,
backstory=self.backstory,
goal=self.goal,
)
# Add response format instructions if specified
if self.response_format:
schema = generate_model_description(self.response_format)
base_prompt += self.i18n.slice("lite_agent_response_format").format(
response_format=schema
)
return base_prompt
def _format_messages(
self, messages: Union[str, List[Dict[str, str]]]
) -> List[Dict[str, str]]:
"""Format messages for the LLM."""
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
system_prompt = self._get_default_system_prompt()
# Add system message at the beginning
formatted_messages = [{"role": "system", "content": system_prompt}]
# Add the rest of the messages
formatted_messages.extend(messages)
return formatted_messages
def _invoke_loop(self) -> AgentFinish:
"""
Run the agent's thought process until it reaches a conclusion or max iterations.
Returns:
AgentFinish: The final result of the agent execution.
"""
# Execute the agent loop
formatted_answer = None
while not isinstance(formatted_answer, AgentFinish):
try:
if has_reached_max_iterations(self._iterations, self.max_iterations):
formatted_answer = handle_max_iterations_exceeded(
formatted_answer,
printer=self._printer,
i18n=self.i18n,
messages=self._messages,
llm=cast(LLM, self.llm),
callbacks=self._callbacks,
)
enforce_rpm_limit(self.request_within_rpm_limit)
# Emit LLM call started event
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=self._messages,
tools=None,
callbacks=self._callbacks,
),
)
try:
answer = get_llm_response(
llm=cast(LLM, self.llm),
messages=self._messages,
callbacks=self._callbacks,
printer=self._printer,
)
# Emit LLM call completed event
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(
response=answer,
call_type=LLMCallType.LLM_CALL,
),
)
except Exception as e:
# Emit LLM call failed event
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e)),
)
raise e
formatted_answer = process_llm_response(answer, self.use_stop_words)
if isinstance(formatted_answer, AgentAction):
# Emit tool usage started event
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
agent_key=self.key,
agent_role=self.role,
tool_name=formatted_answer.tool,
tool_args=formatted_answer.tool_input,
tool_class=formatted_answer.tool,
),
)
try:
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
tools=self._parsed_tools,
i18n=self.i18n,
agent_key=self.key,
agent_role=self.role,
)
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
agent_key=self.key,
agent_role=self.role,
tool_name=formatted_answer.tool,
tool_args=formatted_answer.tool_input,
tool_class=formatted_answer.tool,
started_at=datetime.now(),
finished_at=datetime.now(),
output=tool_result.result,
),
)
except Exception as e:
# Emit tool usage error event
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
agent_key=self.key,
agent_role=self.role,
tool_name=formatted_answer.tool,
tool_args=formatted_answer.tool_input,
tool_class=formatted_answer.tool,
error=str(e),
),
)
raise e
formatted_answer = handle_agent_action_core(
formatted_answer=formatted_answer,
tool_result=tool_result,
show_logs=self._show_logs,
)
self._append_message(formatted_answer.text, role="assistant")
except OutputParserException as e:
formatted_answer = handle_output_parser_exception(
e=e,
messages=self._messages,
iterations=self._iterations,
log_error_after=3,
printer=self._printer,
)
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self._messages,
llm=cast(LLM, self.llm),
callbacks=self._callbacks,
i18n=self.i18n,
)
continue
else:
handle_unknown_error(self._printer, e)
raise e
finally:
self._iterations += 1
assert isinstance(formatted_answer, AgentFinish)
self._show_logs(formatted_answer)
return formatted_answer
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
"""Show logs for the agent's execution."""
show_agent_logs(
printer=self._printer,
agent_role=self.role,
formatted_answer=formatted_answer,
verbose=self.verbose,
)
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
self._messages.append(format_message_for_llm(text, role=role))

View File

@@ -2,5 +2,12 @@ from .entity.entity_memory import EntityMemory
from .long_term.long_term_memory import LongTermMemory
from .short_term.short_term_memory import ShortTermMemory
from .user.user_memory import UserMemory
from .external.external_memory import ExternalMemory
__all__ = ["UserMemory", "EntityMemory", "LongTermMemory", "ShortTermMemory"]
__all__ = [
"UserMemory",
"EntityMemory",
"LongTermMemory",
"ShortTermMemory",
"ExternalMemory",
]

View File

@@ -1,6 +1,12 @@
from typing import Any, Dict, Optional
from crewai.memory import EntityMemory, LongTermMemory, ShortTermMemory, UserMemory
from crewai.memory import (
EntityMemory,
ExternalMemory,
LongTermMemory,
ShortTermMemory,
UserMemory,
)
class ContextualMemory:
@@ -11,6 +17,7 @@ class ContextualMemory:
ltm: LongTermMemory,
em: EntityMemory,
um: UserMemory,
exm: ExternalMemory,
):
if memory_config is not None:
self.memory_provider = memory_config.get("provider")
@@ -20,6 +27,7 @@ class ContextualMemory:
self.ltm = ltm
self.em = em
self.um = um
self.exm = exm
def build_context_for_task(self, task, context) -> str:
"""
@@ -35,6 +43,7 @@ class ContextualMemory:
context.append(self._fetch_ltm_context(task.description))
context.append(self._fetch_stm_context(query))
context.append(self._fetch_entity_context(query))
context.append(self._fetch_external_context(query))
if self.memory_provider == "mem0":
context.append(self._fetch_user_context(query))
return "\n".join(filter(None, context))
@@ -106,3 +115,24 @@ class ContextualMemory:
f"- {result['memory']}" for result in user_memories
)
return f"User memories/preferences:\n{formatted_memories}"
def _fetch_external_context(self, query: str) -> str:
"""
Fetches and formats relevant information from External Memory.
Args:
query (str): The search query to find relevant information.
Returns:
str: Formatted information as bullet points, or an empty string if none found.
"""
if self.exm is None:
return ""
external_memories = self.exm.search(query)
if not external_memories:
return ""
formatted_memories = "\n".join(
f"- {result['memory']}" for result in external_memories
)
return f"External memories:\n{formatted_memories}"

View File

View File

@@ -0,0 +1,61 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Self
from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.interface import Storage
if TYPE_CHECKING:
from crewai.memory.storage.mem0_storage import Mem0Storage
class ExternalMemory(Memory):
def __init__(self, storage: Optional[Storage] = None, **data: Any):
super().__init__(storage=storage, **data)
@staticmethod
def _configure_mem0(crew: Any, config: Dict[str, Any]) -> "Mem0Storage":
from crewai.memory.storage.mem0_storage import Mem0Storage
return Mem0Storage(type="external", crew=crew, config=config)
@staticmethod
def external_supported_storages() -> Dict[str, Any]:
return {
"mem0": ExternalMemory._configure_mem0,
}
@staticmethod
def create_storage(crew: Any, embedder_config: Optional[Dict[str, Any]]) -> Storage:
if not embedder_config:
raise ValueError("embedder_config is required")
if "provider" not in embedder_config:
raise ValueError("embedder_config must include a 'provider' key")
provider = embedder_config["provider"]
supported_storages = ExternalMemory.external_supported_storages()
if provider not in supported_storages:
raise ValueError(f"Provider {provider} not supported")
return supported_storages[provider](crew, embedder_config.get("config", {}))
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Saves a value into the external storage."""
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
def reset(self) -> None:
self.storage.reset()
def set_crew(self, crew: Any) -> Self:
super().set_crew(crew)
if not self.storage:
self.storage = self.create_storage(crew, self.embedder_config)
return self

View File

@@ -0,0 +1,13 @@
from typing import Any, Dict, Optional
class ExternalMemoryItem:
def __init__(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
):
self.value = value
self.metadata = metadata
self.agent = agent

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Self
from pydantic import BaseModel
@@ -9,6 +9,7 @@ class Memory(BaseModel):
"""
embedder_config: Optional[Dict[str, Any]] = None
crew: Optional[Any] = None
storage: Any
@@ -36,3 +37,7 @@ class Memory(BaseModel):
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)
def set_crew(self, crew: Any) -> Self:
self.crew = crew
return self

View File

@@ -11,15 +11,20 @@ class Mem0Storage(Storage):
Extends Storage to handle embedding and searching across entities using Mem0.
"""
def __init__(self, type, crew=None):
def __init__(self, type, crew=None, config=None):
super().__init__()
if type not in ["user", "short_term", "long_term", "entities"]:
raise ValueError("Invalid type for Mem0Storage. Must be 'user' or 'agent'.")
supported_types = ["user", "short_term", "long_term", "entities", "external"]
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: "
+ ", ".join(supported_types)
)
self.memory_type = type
self.crew = crew
self.memory_config = crew.memory_config
self.config = config or {}
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.memory_config = self.config or getattr(crew, "memory_config", {}) or {}
# User ID is required for user memory type "user" since it's used as a unique identifier for the user.
user_id = self._get_user_id()
@@ -27,7 +32,7 @@ class Mem0Storage(Storage):
raise ValueError("User ID is required for user memory type")
# API key in memory config overrides the environment variable
config = self.memory_config.get("config", {})
config = self._get_config()
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
@@ -56,26 +61,34 @@ class Mem0Storage(Storage):
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self._get_user_id()
agent_name = self._get_agent_name()
if self.memory_type == "user":
self.memory.add(value, user_id=user_id, metadata={**metadata})
elif self.memory_type == "short_term":
agent_name = self._get_agent_name()
self.memory.add(
value, agent_id=agent_name, metadata={"type": "short_term", **metadata}
)
params = None
if self.memory_type == "short_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "short_term", **metadata},
}
elif self.memory_type == "long_term":
agent_name = self._get_agent_name()
self.memory.add(
value,
agent_id=agent_name,
infer=False,
metadata={"type": "long_term", **metadata},
)
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "long_term", **metadata},
}
elif self.memory_type == "entities":
entity_name = self._get_agent_name()
self.memory.add(
value, user_id=entity_name, metadata={"type": "entity", **metadata}
)
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "entity", **metadata},
}
elif self.memory_type == "external":
params = {
"user_id": user_id,
"agent_id": agent_name,
"metadata": {"type": "external", **metadata},
}
if params:
self.memory.add(value, **params | {"output_format": "v1.1"})
def search(
self,
@@ -84,41 +97,43 @@ class Mem0Storage(Storage):
score_threshold: float = 0.35,
) -> List[Any]:
params = {"query": query, "limit": limit}
if self.memory_type == "user":
user_id = self._get_user_id()
if user_id := self._get_user_id():
params["user_id"] = user_id
elif self.memory_type == "short_term":
agent_name = self._get_agent_name()
agent_name = self._get_agent_name()
if self.memory_type == "short_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "short_term"}
elif self.memory_type == "long_term":
agent_name = self._get_agent_name()
params["agent_id"] = agent_name
params["metadata"] = {"type": "long_term"}
elif self.memory_type == "entities":
agent_name = self._get_agent_name()
params["agent_id"] = agent_name
params["metadata"] = {"type": "entity"}
elif self.memory_type == "external":
params["agent_id"] = agent_name
params["metadata"] = {"type": "external"}
# Discard the filters for now since we create the filters
# automatically when the crew is created.
results = self.memory.search(**params)
return [r for r in results if r["score"] >= score_threshold]
def _get_user_id(self):
if self.memory_type == "user":
if hasattr(self, "memory_config") and self.memory_config is not None:
return self.memory_config.get("config", {}).get("user_id")
else:
return None
return None
def _get_user_id(self) -> str:
return self._get_config().get("user_id", "")
def _get_agent_name(self):
agents = self.crew.agents if self.crew else []
def _get_agent_name(self) -> str:
if not self.crew:
return ""
agents = self.crew.agents
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return agents
def _get_config(self) -> Dict[str, Any]:
return self.config or getattr(self, "memory_config", {}).get("config", {}) or {}
def reset(self):
if self.memory:
self.memory.reset()

View File

@@ -1,3 +1,4 @@
import warnings
from typing import Any, Dict, Optional
from crewai.memory.memory import Memory
@@ -12,6 +13,12 @@ class UserMemory(Memory):
"""
def __init__(self, crew=None):
warnings.warn(
"UserMemory is deprecated and will be removed in a future version. "
"Please use ExternalMemory instead.",
DeprecationWarning,
stacklevel=2,
)
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
@@ -48,6 +55,4 @@ class UserMemory(Memory):
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the user memory: {e}"
)
raise Exception(f"An error occurred while resetting the user memory: {e}")

View File

@@ -0,0 +1,9 @@
from dataclasses import dataclass
@dataclass
class ToolResult:
"""Result of tool execution."""
result: str
result_as_answer: bool = False

View File

@@ -2,10 +2,11 @@ import ast
import datetime
import json
import time
from dataclasses import dataclass
from difflib import SequenceMatcher
from json import JSONDecodeError
from textwrap import dedent
from typing import Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import json5
from json_repair import repair_json
@@ -13,19 +14,25 @@ from json_repair import repair_json
from crewai.agents.tools_handler import ToolsHandler
from crewai.task import Task
from crewai.telemetry import Telemetry
from crewai.tools import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities import I18N, Converter, Printer
from crewai.utilities.agent_utils import (
get_tool_names,
render_text_description_and_args,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
ToolValidateInputErrorEvent,
)
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.lite_agent import LiteAgent
OPENAI_BIGGER_MODELS = [
"gpt-4",
"gpt-4o",
@@ -61,28 +68,24 @@ class ToolUsage:
def __init__(
self,
tools_handler: ToolsHandler,
tools: List[BaseTool],
original_tools: List[Any],
tools_description: str,
tools_names: str,
task: Task,
tools_handler: Optional[ToolsHandler],
tools: List[CrewStructuredTool],
task: Optional[Task],
function_calling_llm: Any,
agent: Any,
action: Any,
agent: Optional[Union["BaseAgent", "LiteAgent"]] = None,
action: Any = None,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> None:
self._i18n: I18N = agent.i18n
self._i18n: I18N = agent.i18n if agent else I18N()
self._printer: Printer = Printer()
self._telemetry: Telemetry = Telemetry()
self._run_attempts: int = 1
self._max_parsing_attempts: int = 3
self._remember_format_after_usages: int = 3
self.agent = agent
self.tools_description = tools_description
self.tools_names = tools_names
self.tools_description = render_text_description_and_args(tools)
self.tools_names = get_tool_names(tools)
self.tools_handler = tools_handler
self.original_tools = original_tools
self.tools = tools
self.task = task
self.action = action
@@ -106,17 +109,19 @@ class ToolUsage:
) -> str:
if isinstance(calling, ToolUsageErrorException):
error = calling.message
if self.agent.verbose:
if self.agent and self.agent.verbose:
self._printer.print(content=f"\n\n{error}\n", color="red")
self.task.increment_tools_errors()
if self.task:
self.task.increment_tools_errors()
return error
try:
tool = self._select_tool(calling.tool_name)
except Exception as e:
error = getattr(e, "message", str(e))
self.task.increment_tools_errors()
if self.agent.verbose:
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
@@ -130,8 +135,9 @@ class ToolUsage:
except Exception as e:
error = getattr(e, "message", str(e))
self.task.increment_tools_errors()
if self.agent.verbose:
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
@@ -140,9 +146,9 @@ class ToolUsage:
def _use(
self,
tool_string: str,
tool: Any,
tool: CrewStructuredTool,
calling: Union[ToolCalling, InstructorToolCalling],
) -> str: # TODO: Fix this return type
) -> str:
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try:
result = self._i18n.errors("task_repeated_usage").format(
@@ -157,24 +163,29 @@ class ToolUsage:
return result # type: ignore # Fix the return type of this function
except Exception:
self.task.increment_tools_errors()
if self.task:
self.task.increment_tools_errors()
started_at = time.time()
from_cache = False
result = None # type: ignore
result = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
# check if cache is available
if self.tools_handler.cache:
result = self.tools_handler.cache.read( # type: ignore # Incompatible types in assignment (expression has type "str | None", variable has type "str")
if self.tools_handler and self.tools_handler.cache:
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=calling.arguments
)
) # type: ignore
from_cache = result is not None
original_tool = next(
(ot for ot in self.original_tools if ot.name == tool.name), None
available_tool = next(
(
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
),
None,
)
if result is None: #! finecwg: if not result --> if result is None
if result is None:
try:
if calling.tool_name in [
"Delegate work to coworker",
@@ -183,7 +194,8 @@ class ToolUsage:
coworker = (
calling.arguments.get("coworker") if calling.arguments else None
)
self.task.increment_delegations(coworker)
if self.task:
self.task.increment_delegations(coworker)
if calling.arguments:
try:
@@ -218,23 +230,25 @@ class ToolUsage:
error = ToolUsageErrorException(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
self.task.increment_tools_errors()
if self.agent.verbose:
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
)
return error # type: ignore # No return value expected
self.task.increment_tools_errors()
if self.task:
self.task.increment_tools_errors()
return self.use(calling=calling, tool_string=tool_string) # type: ignore # No return value expected
if self.tools_handler:
should_cache = True
if (
hasattr(original_tool, "cache_function")
and original_tool.cache_function # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
hasattr(available_tool, "cache_function")
and available_tool.cache_function # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
):
should_cache = original_tool.cache_function( # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
should_cache = available_tool.cache_function( # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
calling.arguments, result
)
@@ -262,41 +276,46 @@ class ToolUsage:
)
if (
hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
):
result_as_answer = original_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer"
data["result_as_answer"] = result_as_answer
result_as_answer = available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer"
data["result_as_answer"] = result_as_answer # type: ignore
self.agent.tools_results.append(data)
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
return result # type: ignore # No return value expected
def _format_result(self, result: Any) -> None:
self.task.used_tools += 1
if self._should_remember_format(): # type: ignore # "_should_remember_format" of "ToolUsage" does not return a value (it only ever returns None)
result = self._remember_format(result=result) # type: ignore # "_remember_format" of "ToolUsage" does not return a value (it only ever returns None)
return result
def _should_remember_format(self) -> bool:
return self.task.used_tools % self._remember_format_after_usages == 0
def _format_result(self, result: Any) -> str:
if self.task:
self.task.used_tools += 1
if self._should_remember_format():
result = self._remember_format(result=result)
return str(result)
def _remember_format(self, result: str) -> None:
def _should_remember_format(self) -> bool:
if self.task:
return self.task.used_tools % self._remember_format_after_usages == 0
return False
def _remember_format(self, result: str) -> str:
result = str(result)
result += "\n\n" + self._i18n.slice("tools").format(
tools=self.tools_description, tool_names=self.tools_names
)
return result # type: ignore # No return value expected
return result
def _check_tool_repeated_usage(
self, calling: Union[ToolCalling, InstructorToolCalling]
) -> None:
) -> bool:
if not self.tools_handler:
return False # type: ignore # No return value expected
return False
if last_tool_usage := self.tools_handler.last_used_tool:
return (calling.tool_name == last_tool_usage.tool_name) and ( # type: ignore # No return value expected
return (calling.tool_name == last_tool_usage.tool_name) and (
calling.arguments == last_tool_usage.arguments
)
return False
def _select_tool(self, tool_name: str) -> Any:
order_tools = sorted(
@@ -315,10 +334,11 @@ class ToolUsage:
> 0.85
):
return tool
self.task.increment_tools_errors()
tool_selection_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
if self.task:
self.task.increment_tools_errors()
tool_selection_data: Dict[str, Any] = {
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
"tool_name": tool_name,
"tool_args": {},
"tool_class": self.tools_description,
@@ -351,7 +371,9 @@ class ToolUsage:
descriptions.append(tool.description)
return "\n--\n".join(descriptions)
def _function_calling(self, tool_string: str):
def _function_calling(
self, tool_string: str
) -> Union[ToolCalling, InstructorToolCalling]:
model = (
InstructorToolCalling
if self.function_calling_llm.supports_function_calling()
@@ -373,18 +395,14 @@ class ToolUsage:
max_attempts=1,
)
tool_object = converter.to_pydantic()
calling = ToolCalling(
tool_name=tool_object["tool_name"],
arguments=tool_object["arguments"],
log=tool_string, # type: ignore
)
if not isinstance(tool_object, (ToolCalling, InstructorToolCalling)):
raise ToolUsageErrorException("Failed to parse tool calling")
if isinstance(calling, ConverterError):
raise calling
return tool_object
return calling
def _original_tool_calling(self, tool_string: str, raise_error: bool = False):
def _original_tool_calling(
self, tool_string: str, raise_error: bool = False
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
tool_name = self.action.tool
tool = self._select_tool(tool_name)
try:
@@ -409,12 +427,11 @@ class ToolUsage:
return ToolCalling(
tool_name=tool.name,
arguments=arguments,
log=tool_string,
)
def _tool_calling(
self, tool_string: str
) -> Union[ToolCalling, InstructorToolCalling]:
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
try:
try:
return self._original_tool_calling(tool_string, raise_error=True)
@@ -427,8 +444,9 @@ class ToolUsage:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
self.task.increment_tools_errors()
if self.agent.verbose:
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
@@ -458,6 +476,7 @@ class ToolUsage:
if isinstance(arguments, dict):
return arguments
except (ValueError, SyntaxError):
repaired_input = repair_json(tool_input)
pass # Continue to the next parsing attempt
# Attempt 3: Parse as JSON5
@@ -470,7 +489,7 @@ class ToolUsage:
# Attempt 4: Repair JSON
try:
repaired_input = repair_json(tool_input, skip_json_loads=True)
repaired_input = str(repair_json(tool_input, skip_json_loads=True))
self._printer.print(
content=f"Repaired JSON: {repaired_input}", color="blue"
)
@@ -490,8 +509,8 @@ class ToolUsage:
def _emit_validate_input_error(self, final_error: str):
tool_selection_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
"tool_name": self.action.tool,
"tool_args": str(self.action.tool_input),
"tool_class": self.__class__.__name__,
@@ -507,14 +526,19 @@ class ToolUsage:
ToolValidateInputErrorEvent(**tool_selection_data, error=final_error),
)
def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None:
def on_tool_error(
self,
tool: Any,
tool_calling: Union[ToolCalling, InstructorToolCalling],
e: Exception,
) -> None:
event_data = self._prepare_event_data(tool, tool_calling)
crewai_event_bus.emit(self, ToolUsageErrorEvent(**{**event_data, "error": e}))
def on_tool_use_finished(
self,
tool: Any,
tool_calling: ToolCalling,
tool_calling: Union[ToolCalling, InstructorToolCalling],
from_cache: bool,
started_at: float,
result: Any,
@@ -531,16 +555,24 @@ class ToolUsage:
)
crewai_event_bus.emit(self, ToolUsageFinishedEvent(**event_data))
def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict:
def _prepare_event_data(
self, tool: Any, tool_calling: Union[ToolCalling, InstructorToolCalling]
) -> dict:
event_data = {
"agent_key": self.agent.key,
"agent_role": (self.agent._original_role or self.agent.role),
"run_attempts": self._run_attempts,
"delegations": self.task.delegations,
"delegations": self.task.delegations if self.task else 0,
"tool_name": tool.name,
"tool_args": tool_calling.arguments,
"tool_class": tool.__class__.__name__,
"agent": self.agent, # Adding agent for fingerprint extraction
"agent_key": (
getattr(self.agent, "key", "unknown") if self.agent else "unknown"
),
"agent_role": (
getattr(self.agent, "_original_role", None)
or getattr(self.agent, "role", "unknown")
if self.agent
else "unknown"
),
}
# Include fingerprint context if available
@@ -562,21 +594,31 @@ class ToolUsage:
arguments = arguments.copy()
# Add security metadata under a designated key
if not "security_context" in arguments:
if "security_context" not in arguments:
arguments["security_context"] = {}
security_context = arguments["security_context"]
# Add agent fingerprint if available
if hasattr(self, "agent") and hasattr(self.agent, "security_config"):
security_context["agent_fingerprint"] = self.agent.security_config.fingerprint.to_dict()
if self.agent and hasattr(self.agent, "security_config"):
security_config = getattr(self.agent, "security_config", None)
if security_config and hasattr(security_config, "fingerprint"):
try:
security_context["agent_fingerprint"] = (
security_config.fingerprint.to_dict()
)
except AttributeError:
pass
# Add task fingerprint if available
if hasattr(self, "task") and hasattr(self.task, "security_config"):
security_context["task_fingerprint"] = self.task.security_config.fingerprint.to_dict()
# Add crew fingerprint if available
if hasattr(self, "crew") and hasattr(self.crew, "security_config"):
security_context["crew_fingerprint"] = self.crew.security_config.fingerprint.to_dict()
if self.task and hasattr(self.task, "security_config"):
security_config = getattr(self.task, "security_config", None)
if security_config and hasattr(security_config, "fingerprint"):
try:
security_context["task_fingerprint"] = (
security_config.fingerprint.to_dict()
)
except AttributeError:
pass
return arguments

View File

@@ -24,7 +24,10 @@
"manager_request": "Your best answer to your coworker asking you this, accounting for the context shared.",
"formatted_task_instructions": "Ensure your final answer contains only the content in the following format: {output_format}\n\nEnsure the final output does not include any code block markers like ```json or ```python.",
"conversation_history_instruction": "You are a member of a crew collaborating to achieve a common goal. Your task is a specific action that contributes to this larger objective. For additional context, please review the conversation history between you and the user that led to the initiation of this crew. Use any relevant information or feedback from the conversation to inform your task execution and ensure your response aligns with both the immediate task and the crew's overall goals.",
"feedback_instructions": "User feedback: {feedback}\nInstructions: Use this feedback to enhance the next output iteration.\nNote: Do not respond or add commentary."
"feedback_instructions": "User feedback: {feedback}\nInstructions: Use this feedback to enhance the next output iteration.\nNote: Do not respond or add commentary.",
"lite_agent_system_prompt_with_tools": "You are {role}. {backstory}\nYour personal goal is: {goal}\n\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"lite_agent_system_prompt_without_tools": "You are {role}. {backstory}\nYour personal goal is: {goal}\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"lite_agent_response_format": "\nIMPORTANT: Your final answer MUST contain all the information requested in the following format: {response_format}\n\nIMPORTANT: Ensure the final output does not include any code block markers like ```json or ```python."
},
"errors": {
"force_final_answer_error": "You can't keep going, here is the best final answer you generated:\n\n {formatted_answer}",

View File

@@ -0,0 +1,431 @@
import json
import re
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from crewai.agents.parser import (
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE,
AgentAction,
AgentFinish,
CrewAgentParser,
OutputParserException,
)
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.tools import BaseTool as CrewAITool
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.utilities import I18N, Printer
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
"""Parse tools to be used for the task."""
tools_list = []
for tool in tools:
if isinstance(tool, CrewAITool):
tools_list.append(tool.to_structured_tool())
else:
raise ValueError("Tool is not a CrewStructuredTool or BaseTool")
return tools_list
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool]]) -> str:
"""Get the names of the tools."""
return ", ".join([t.name for t in tools])
def render_text_description_and_args(
tools: Sequence[Union[CrewStructuredTool, BaseTool]],
) -> str:
"""Render the tool name, description, and args in plain text.
search: This tool is used for search, args: {"query": {"type": "string"}}
calculator: This tool is used for math, \
args: {"expression": {"type": "string"}}
"""
tool_strings = []
for tool in tools:
tool_strings.append(tool.description)
return "\n".join(tool_strings)
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached."""
return iterations >= max_iterations
def handle_max_iterations_exceeded(
formatted_answer: Union[AgentAction, AgentFinish, None],
printer: Printer,
i18n: I18N,
messages: List[Dict[str, str]],
llm: Union[LLM, BaseLLM],
callbacks: List[Any],
) -> Union[AgentAction, AgentFinish]:
"""
Handles the case when the maximum number of iterations is exceeded.
Performs one more LLM call to get the final answer.
Parameters:
formatted_answer: The last formatted answer from the agent.
Returns:
The final formatted answer after exceeding max iterations.
"""
printer.print(
content="Maximum iterations reached. Requesting final answer.",
color="yellow",
)
if formatted_answer and hasattr(formatted_answer, "text"):
assistant_message = (
formatted_answer.text + f'\n{i18n.errors("force_final_answer")}'
)
else:
assistant_message = i18n.errors("force_final_answer")
messages.append(format_message_for_llm(assistant_message, role="assistant"))
# Perform one more LLM call to get the final answer
answer = llm.call(
messages,
callbacks=callbacks,
)
if answer is None or answer == "":
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
formatted_answer = format_answer(answer)
# Return the formatted answer, regardless of its type
return formatted_answer
def format_message_for_llm(prompt: str, role: str = "user") -> Dict[str, str]:
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
def format_answer(answer: str) -> Union[AgentAction, AgentFinish]:
"""Format a response from the LLM into an AgentAction or AgentFinish."""
try:
return CrewAgentParser.parse_text(answer)
except Exception:
# If parsing fails, return a default AgentFinish
return AgentFinish(
thought="Failed to parse LLM response",
output=answer,
text=answer,
)
def enforce_rpm_limit(
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
) -> None:
"""Enforce the requests per minute (RPM) limit if applicable."""
if request_within_rpm_limit:
request_within_rpm_limit()
def get_llm_response(
llm: Union[LLM, BaseLLM],
messages: List[Dict[str, str]],
callbacks: List[Any],
printer: Printer,
) -> str:
"""Call the LLM and return the response, handling any invalid responses."""
try:
answer = llm.call(
messages,
callbacks=callbacks,
)
except Exception as e:
printer.print(
content=f"Error during LLM call: {e}",
color="red",
)
raise e
if not answer:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return answer
def process_llm_response(
answer: str, use_stop_words: bool
) -> Union[AgentAction, AgentFinish]:
"""Process the LLM response and format it into an AgentAction or AgentFinish."""
if not use_stop_words:
try:
# Preliminary parsing to check for errors.
format_answer(answer)
except OutputParserException as e:
if FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE in e.error:
answer = answer.split("Observation:")[0].strip()
return format_answer(answer)
def handle_agent_action_core(
formatted_answer: AgentAction,
tool_result: ToolResult,
messages: Optional[List[Dict[str, str]]] = None,
step_callback: Optional[Callable] = None,
show_logs: Optional[Callable] = None,
) -> Union[AgentAction, AgentFinish]:
"""Core logic for handling agent actions and tool results.
Args:
formatted_answer: The agent's action
tool_result: The result of executing the tool
messages: Optional list of messages to append results to
step_callback: Optional callback to execute after processing
show_logs: Optional function to show logs
Returns:
Either an AgentAction or AgentFinish
"""
if step_callback:
step_callback(tool_result)
formatted_answer.text += f"\nObservation: {tool_result.result}"
formatted_answer.result = tool_result.result
if tool_result.result_as_answer:
return AgentFinish(
thought="",
output=tool_result.result,
text=formatted_answer.text,
)
if show_logs:
show_logs(formatted_answer)
if messages is not None:
messages.append({"role": "assistant", "content": tool_result.result})
return formatted_answer
def handle_unknown_error(printer: Any, exception: Exception) -> None:
"""Handle unknown errors by informing the user.
Args:
printer: Printer instance for output
exception: The exception that occurred
"""
printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
printer.print(
content=f"Error details: {exception}",
color="red",
)
def handle_output_parser_exception(
e: OutputParserException,
messages: List[Dict[str, str]],
iterations: int,
log_error_after: int = 3,
printer: Optional[Any] = None,
) -> AgentAction:
"""Handle OutputParserException by updating messages and formatted_answer.
Args:
e: The OutputParserException that occurred
messages: List of messages to append to
iterations: Current iteration count
log_error_after: Number of iterations after which to log errors
printer: Optional printer instance for logging
Returns:
AgentAction: A formatted answer with the error
"""
messages.append({"role": "user", "content": e.error})
formatted_answer = AgentAction(
text=e.error,
tool="",
tool_input="",
thought="",
)
if iterations > log_error_after and printer:
printer.print(
content=f"Error parsing LLM output, agent will retry: {e.error}",
color="red",
)
return formatted_answer
def is_context_length_exceeded(exception: Exception) -> bool:
"""Check if the exception is due to context length exceeding.
Args:
exception: The exception to check
Returns:
bool: True if the exception is due to context length exceeding
"""
return LLMContextLengthExceededException(str(exception))._is_context_limit_error(
str(exception)
)
def handle_context_length(
respect_context_window: bool,
printer: Any,
messages: List[Dict[str, str]],
llm: Any,
callbacks: List[Any],
i18n: Any,
) -> None:
"""Handle context length exceeded by either summarizing or raising an error.
Args:
respect_context_window: Whether to respect context window
printer: Printer instance for output
messages: List of messages to summarize
llm: LLM instance for summarization
callbacks: List of callbacks for LLM
i18n: I18N instance for messages
"""
if respect_context_window:
printer.print(
content="Context length exceeded. Summarizing content to fit the model context window.",
color="yellow",
)
summarize_messages(messages, llm, callbacks, i18n)
else:
printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)
def summarize_messages(
messages: List[Dict[str, str]],
llm: Any,
callbacks: List[Any],
i18n: Any,
) -> None:
"""Summarize messages to fit within context window.
Args:
messages: List of messages to summarize
llm: LLM instance for summarization
callbacks: List of callbacks for LLM
i18n: I18N instance for messages
"""
messages_groups = []
for message in messages:
content = message["content"]
cut_size = llm.get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append({"content": content[i : i + cut_size]})
summarized_contents = []
for group in messages_groups:
summary = llm.call(
[
format_message_for_llm(
i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
i18n.slice("summarize_instruction").format(group=group["content"]),
),
],
callbacks=callbacks,
)
summarized_contents.append({"content": str(summary)})
merged_summary = " ".join(content["content"] for content in summarized_contents)
messages.clear()
messages.append(
format_message_for_llm(
i18n.slice("summary").format(merged_summary=merged_summary)
)
)
def show_agent_logs(
printer: Printer,
agent_role: str,
formatted_answer: Optional[Union[AgentAction, AgentFinish]] = None,
task_description: Optional[str] = None,
verbose: bool = False,
) -> None:
"""Show agent logs for both start and execution states.
Args:
printer: Printer instance for output
agent_role: Role of the agent
formatted_answer: Optional AgentAction or AgentFinish for execution logs
task_description: Optional task description for start logs
verbose: Whether to show verbose output
"""
if not verbose:
return
agent_role = agent_role.split("\n")[0]
if formatted_answer is None:
# Start logs
printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
if task_description:
printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{task_description}\033[00m"
)
else:
# Execution logs
printer.print(
content=f"\n\n\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
if isinstance(formatted_answer, AgentAction):
thought = re.sub(r"\n+", "\n", formatted_answer.thought)
formatted_json = json.dumps(
formatted_answer.tool_input,
indent=2,
ensure_ascii=False,
)
if thought and thought != "":
printer.print(
content=f"\033[95m## Thought:\033[00m \033[92m{thought}\033[00m"
)
printer.print(
content=f"\033[95m## Using tool:\033[00m \033[92m{formatted_answer.tool}\033[00m"
)
printer.print(
content=f"\033[95m## Tool Input:\033[00m \033[92m\n{formatted_json}\033[00m"
)
printer.print(
content=f"\033[95m## Tool Output:\033[00m \033[92m\n{formatted_answer.result}\033[00m"
)
elif isinstance(formatted_answer, AgentFinish):
printer.print(
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)

View File

@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
@@ -74,3 +74,31 @@ class AgentExecutionErrorEvent(BaseEvent):
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
# New event classes for LiteAgent
class LiteAgentExecutionStartedEvent(BaseEvent):
"""Event emitted when a LiteAgent starts executing"""
agent_info: Dict[str, Any]
tools: Optional[Sequence[Union[BaseTool, CrewStructuredTool]]]
messages: Union[str, List[Dict[str, str]]]
type: str = "lite_agent_execution_started"
model_config = {"arbitrary_types_allowed": True}
class LiteAgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when a LiteAgent completes execution"""
agent_info: Dict[str, Any]
output: str
type: str = "lite_agent_execution_completed"
class LiteAgentExecutionErrorEvent(BaseEvent):
"""Event emitted when a LiteAgent encounters an error during execution"""
agent_info: Dict[str, Any]
error: str
type: str = "lite_agent_execution_error"

View File

@@ -16,7 +16,13 @@ from crewai.utilities.events.llm_events import (
)
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
from .agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from .crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
@@ -65,7 +71,7 @@ class EventListener(BaseEventListener):
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
self.formatter = ConsoleFormatter()
self.formatter = ConsoleFormatter(verbose=True)
# ----------- CREW EVENTS -----------
@@ -171,6 +177,36 @@ class EventListener(BaseEventListener):
self.formatter.current_crew_tree,
)
# ----------- LITE AGENT EVENTS -----------
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_execution_started(
source, event: LiteAgentExecutionStartedEvent
):
"""Handle LiteAgent execution started event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="started", **event.agent_info
)
@crewai_event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_execution_completed(
source, event: LiteAgentExecutionCompletedEvent
):
"""Handle LiteAgent execution completed event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="completed", **event.agent_info
)
@crewai_event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_execution_error(source, event: LiteAgentExecutionErrorEvent):
"""Handle LiteAgent execution error event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"],
status="failed",
error=event.error,
**event.agent_info,
)
# ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent)

View File

@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Any, Dict, Optional
from rich.console import Console
from rich.panel import Panel
@@ -13,6 +13,7 @@ class ConsoleFormatter:
current_tool_branch: Optional[Tree] = None
current_flow_tree: Optional[Tree] = None
current_method_branch: Optional[Tree] = None
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
def __init__(self, verbose: bool = False):
@@ -390,21 +391,24 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle tool usage started event."""
if not self.verbose or agent_branch is None or crew_tree is None:
if not self.verbose:
return None
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return None
# Update tool usage count
self.tool_usage_counts[tool_name] = self.tool_usage_counts.get(tool_name, 0) + 1
# Find existing tool node or create new one
tool_branch = None
for child in agent_branch.children:
if tool_name in str(child.label):
tool_branch = child
break
if not tool_branch:
tool_branch = agent_branch.add("")
# Find or create tool node
tool_branch = self.current_tool_branch
if tool_branch is None:
tool_branch = branch_to_use.add("")
self.current_tool_branch = tool_branch
# Update label with current count
self.update_tree_label(
@@ -414,11 +418,10 @@ class ConsoleFormatter:
"yellow",
)
self.print(crew_tree)
self.print()
# Set the current_tool_branch attribute directly
self.current_tool_branch = tool_branch
# Only print if this is a new tool usage
if tool_branch not in branch_to_use.children:
self.print(tree_to_use)
self.print()
return tool_branch
@@ -429,17 +432,29 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> None:
"""Handle tool usage finished event."""
if not self.verbose or tool_branch is None or crew_tree is None:
if not self.verbose or tool_branch is None:
return
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
if tree_to_use is None:
return
# Update the existing tool node's label
self.update_tree_label(
tool_branch,
"🔧",
f"Used {tool_name} ({self.tool_usage_counts[tool_name]})",
"green",
)
self.print(crew_tree)
self.print()
# Clear the current tool branch as we're done with it
self.current_tool_branch = None
# Only print if we have a valid tree and the tool node is still in it
if isinstance(tree_to_use, Tree) and tool_branch in tree_to_use.children:
self.print(tree_to_use)
self.print()
def handle_tool_usage_error(
self,
@@ -452,6 +467,9 @@ class ConsoleFormatter:
if not self.verbose:
return
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
if tool_branch:
self.update_tree_label(
tool_branch,
@@ -459,8 +477,9 @@ class ConsoleFormatter:
f"{tool_name} ({self.tool_usage_counts[tool_name]})",
"red",
)
self.print(crew_tree)
self.print()
if tree_to_use:
self.print(tree_to_use)
self.print()
# Show error panel
error_content = self.create_status_content(
@@ -474,19 +493,23 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle LLM call started event."""
if not self.verbose or agent_branch is None or crew_tree is None:
if not self.verbose:
return None
# Only add thinking status if it doesn't exist
if not any("Thinking" in str(child.label) for child in agent_branch.children):
tool_branch = agent_branch.add("")
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return None
# Only add thinking status if we don't have a current tool branch
if self.current_tool_branch is None:
tool_branch = branch_to_use.add("")
self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue")
self.print(crew_tree)
self.print()
# Set the current_tool_branch attribute directly
self.current_tool_branch = tool_branch
self.print(tree_to_use)
self.print()
return tool_branch
return None
@@ -497,20 +520,27 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> None:
"""Handle LLM call completed event."""
if (
not self.verbose
or tool_branch is None
or agent_branch is None
or crew_tree is None
):
if not self.verbose or tool_branch is None:
return
# Remove the thinking status node when complete
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return
# Remove the thinking status node when complete, but only if it exists
if "Thinking" in str(tool_branch.label):
if tool_branch in agent_branch.children:
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
try:
# Check if the node is actually in the children list
if tool_branch in branch_to_use.children:
branch_to_use.children.remove(tool_branch)
self.print(tree_to_use)
self.print()
except Exception:
# If any error occurs during removal, just continue without removing
pass
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
@@ -519,11 +549,15 @@ class ConsoleFormatter:
if not self.verbose:
return
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
# Update tool branch if it exists
if tool_branch:
tool_branch.label = Text("❌ LLM Failed", style="red bold")
self.print(crew_tree)
self.print()
if tree_to_use:
self.print(tree_to_use)
self.print()
# Show error panel
error_content = Text()
@@ -658,3 +692,94 @@ class ConsoleFormatter:
self.print_panel(failure_content, "Test Failure", "red")
self.print()
def create_lite_agent_branch(self, lite_agent_role: str) -> Optional[Tree]:
"""Create and initialize a lite agent branch."""
if not self.verbose:
return None
# Create initial tree for LiteAgent if it doesn't exist
if not self.current_lite_agent_branch:
lite_agent_label = Text()
lite_agent_label.append("🤖 LiteAgent: ", style="cyan bold")
lite_agent_label.append(lite_agent_role, style="cyan")
lite_agent_label.append("\n Status: ", style="white")
lite_agent_label.append("In Progress", style="yellow")
lite_agent_tree = Tree(lite_agent_label)
self.current_lite_agent_branch = lite_agent_tree
self.print(lite_agent_tree)
self.print()
return self.current_lite_agent_branch
def update_lite_agent_status(
self,
lite_agent_branch: Optional[Tree],
lite_agent_role: str,
status: str = "completed",
**fields: Dict[str, Any],
) -> None:
"""Update lite agent status in the tree."""
if not self.verbose or lite_agent_branch is None:
return
# Determine style based on status
if status == "completed":
prefix, style = "✅ LiteAgent:", "green"
status_text = "Completed"
title = "LiteAgent Completion"
elif status == "failed":
prefix, style = "❌ LiteAgent:", "red"
status_text = "Failed"
title = "LiteAgent Error"
else:
prefix, style = "🤖 LiteAgent:", "yellow"
status_text = "In Progress"
title = "LiteAgent Status"
# Update the tree label
lite_agent_label = Text()
lite_agent_label.append(f"{prefix} ", style=f"{style} bold")
lite_agent_label.append(lite_agent_role, style=style)
lite_agent_label.append("\n Status: ", style="white")
lite_agent_label.append(status_text, style=f"{style} bold")
lite_agent_branch.label = lite_agent_label
self.print(lite_agent_branch)
self.print()
# Show status panel if additional fields are provided
if fields:
content = self.create_status_content(
f"LiteAgent {status.title()}", lite_agent_role, style, **fields
)
self.print_panel(content, title, style)
def handle_lite_agent_execution(
self,
lite_agent_role: str,
status: str = "started",
error: Any = None,
**fields: Dict[str, Any],
) -> None:
"""Handle lite agent execution events with consistent formatting."""
if not self.verbose:
return
if status == "started":
# Create or get the LiteAgent branch
lite_agent_branch = self.create_lite_agent_branch(lite_agent_role)
if lite_agent_branch and fields:
# Show initial status panel
content = self.create_status_content(
"LiteAgent Session Started", lite_agent_role, "cyan", **fields
)
self.print_panel(content, "LiteAgent Started", "cyan")
else:
# Update existing LiteAgent branch
if error:
fields["Error"] = error
self.update_lite_agent_status(
self.current_lite_agent_branch, lite_agent_role, status, **fields
)

View File

@@ -9,7 +9,7 @@ class Prompts(BaseModel):
"""Manages and generates prompts for a generic agent."""
i18n: I18N = Field(default=I18N())
tools: list[Any] = Field(default=[])
has_tools: bool = False
system_template: Optional[str] = None
prompt_template: Optional[str] = None
response_template: Optional[str] = None
@@ -19,7 +19,7 @@ class Prompts(BaseModel):
def task_execution(self) -> dict[str, str]:
"""Generate a standard prompt for task execution."""
slices = ["role_playing"]
if len(self.tools) > 0:
if self.has_tools:
slices.append("tools")
else:
slices.append("no_tools")

View File

@@ -0,0 +1,126 @@
from typing import Any, Dict, List, Optional
from crewai.agents.parser import AgentAction
from crewai.security import Fingerprint
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.i18n import I18N
def execute_tool_and_check_finality(
agent_action: AgentAction,
tools: List[CrewStructuredTool],
i18n: I18N,
agent_key: Optional[str] = None,
agent_role: Optional[str] = None,
tools_handler: Optional[Any] = None,
task: Optional[Any] = None,
agent: Optional[Any] = None,
function_calling_llm: Optional[Any] = None,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> ToolResult:
"""Execute a tool and check if the result should be treated as a final answer.
Args:
agent_action: The action containing the tool to execute
tools: List of available tools
i18n: Internationalization settings
agent_key: Optional key for event emission
agent_role: Optional role for event emission
tools_handler: Optional tools handler for tool execution
task: Optional task for tool execution
agent: Optional agent instance for tool execution
function_calling_llm: Optional LLM for function calling
Returns:
ToolResult containing the execution result and whether it should be treated as a final answer
"""
try:
# Create tool name to tool map
tool_name_to_tool_map = {tool.name: tool for tool in tools}
# Emit tool usage event if agent info is available
if agent_key and agent_role and agent:
fingerprint_context = fingerprint_context or {}
if agent:
if hasattr(agent, "set_fingerprint") and callable(
agent.set_fingerprint
):
if isinstance(fingerprint_context, dict):
try:
fingerprint_obj = Fingerprint.from_dict(fingerprint_context)
agent.set_fingerprint(fingerprint_obj)
except Exception as e:
raise ValueError(f"Failed to set fingerprint: {e}")
event_data = {
"agent_key": agent_key,
"agent_role": agent_role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"agent": agent,
}
event_data.update(fingerprint_context)
crewai_event_bus.emit(
agent,
event=ToolUsageStartedEvent(
**event_data,
),
)
# Create tool usage instance
tool_usage = ToolUsage(
tools_handler=tools_handler,
tools=tools,
function_calling_llm=function_calling_llm,
task=task,
agent=agent,
action=agent_action,
)
# Parse tool calling
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
return ToolResult(tool_calling.message, False)
# Check if tool name matches
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in tool_name_to_tool_map
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool = tool_name_to_tool_map.get(tool_calling.tool_name)
if tool:
return ToolResult(tool_result, tool.result_as_answer)
# Handle invalid tool name
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in tools]),
)
return ToolResult(tool_result, False)
except Exception as e:
# Emit error event if agent info is available
if agent_key and agent_role and agent:
crewai_event_bus.emit(
agent,
event=ToolUsageErrorEvent(
agent_key=agent_key,
agent_role=agent_role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
error=str(e),
),
)
raise e