mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 16:18:30 +00:00
372 lines
14 KiB
Python
372 lines
14 KiB
Python
import os
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from langchain.agents.agent import RunnableAgent
|
|
from langchain.agents.tools import tool as LangChainTool
|
|
from langchain.tools.render import render_text_description
|
|
from langchain_core.agents import AgentAction
|
|
from langchain_core.callbacks import BaseCallbackHandler
|
|
from langchain_openai import ChatOpenAI
|
|
from pydantic import (
|
|
UUID4,
|
|
BaseModel,
|
|
ConfigDict,
|
|
Field,
|
|
InstanceOf,
|
|
PrivateAttr,
|
|
field_validator,
|
|
model_validator,
|
|
)
|
|
from pydantic_core import PydanticCustomError
|
|
|
|
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser, ToolsHandler
|
|
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
|
from crewai.utilities import I18N, Logger, Prompts, RPMController
|
|
from crewai.utilities.token_counter_callback import TokenCalcHandler, TokenProcess
|
|
from agentops.agent import track_agent
|
|
|
|
|
|
@track_agent()
|
|
class Agent(BaseModel):
|
|
"""Represents an agent in a system.
|
|
|
|
Each agent has a role, a goal, a backstory, and an optional language model (llm).
|
|
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
|
|
|
|
Attributes:
|
|
agent_executor: An instance of the CrewAgentExecutor class.
|
|
role: The role of the agent.
|
|
goal: The objective of the agent.
|
|
backstory: The backstory of the agent.
|
|
config: Dict representation of agent configuration.
|
|
llm: The language model that will run the agent.
|
|
function_calling_llm: The language model that will the tool calling for this agent, it overrides the crew function_calling_llm.
|
|
max_iter: Maximum number of iterations for an agent to execute a task.
|
|
memory: Whether the agent should have memory or not.
|
|
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
|
|
verbose: Whether the agent execution should be in verbose mode.
|
|
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
|
|
tools: Tools at agents disposal
|
|
step_callback: Callback to be executed after each step of the agent execution.
|
|
callbacks: A list of callback functions from the langchain library that are triggered during the agent's execution process
|
|
"""
|
|
|
|
__hash__ = object.__hash__ # type: ignore
|
|
_logger: Logger = PrivateAttr()
|
|
_rpm_controller: RPMController = PrivateAttr(default=None)
|
|
_request_within_rpm_limit: Any = PrivateAttr(default=None)
|
|
_token_process: TokenProcess = TokenProcess()
|
|
agent_ops_agent_name: str = None
|
|
agent_ops_agent_id: str = None
|
|
|
|
formatting_errors: int = 0
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
id: UUID4 = Field(
|
|
default_factory=uuid.uuid4,
|
|
frozen=True,
|
|
description="Unique identifier for the object, not set by user.",
|
|
)
|
|
role: str = Field(description="Role of the agent")
|
|
goal: str = Field(description="Objective of the agent")
|
|
backstory: str = Field(description="Backstory of the agent")
|
|
cache: bool = Field(
|
|
default=True,
|
|
description="Whether the agent should use a cache for tool usage.",
|
|
)
|
|
config: Optional[Dict[str, Any]] = Field(
|
|
description="Configuration for the agent",
|
|
default=None,
|
|
)
|
|
max_rpm: Optional[int] = Field(
|
|
default=None,
|
|
description="Maximum number of requests per minute for the agent execution to be respected.",
|
|
)
|
|
verbose: bool = Field(
|
|
default=False, description="Verbose mode for the Agent Execution"
|
|
)
|
|
allow_delegation: bool = Field(
|
|
default=True, description="Allow delegation of tasks to agents"
|
|
)
|
|
tools: Optional[List[Any]] = Field(
|
|
default_factory=list, description="Tools at agents disposal"
|
|
)
|
|
max_iter: Optional[int] = Field(
|
|
default=25, description="Maximum iterations for an agent to execute a task"
|
|
)
|
|
max_execution_time: Optional[int] = Field(
|
|
default=None,
|
|
description="Maximum execution time for an agent to execute a task",
|
|
)
|
|
agent_executor: InstanceOf[CrewAgentExecutor] = Field(
|
|
default=None, description="An instance of the CrewAgentExecutor class."
|
|
)
|
|
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
|
|
tools_handler: InstanceOf[ToolsHandler] = Field(
|
|
default=None, description="An instance of the ToolsHandler class."
|
|
)
|
|
cache_handler: InstanceOf[CacheHandler] = Field(
|
|
default=None, description="An instance of the CacheHandler class."
|
|
)
|
|
step_callback: Optional[Any] = Field(
|
|
default=None,
|
|
description="Callback to be executed after each step of the agent execution.",
|
|
)
|
|
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
|
|
llm: Any = Field(
|
|
default_factory=lambda: ChatOpenAI(
|
|
model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4")
|
|
),
|
|
description="Language model that will run the agent.",
|
|
)
|
|
function_calling_llm: Optional[Any] = Field(
|
|
description="Language model that will run the agent.", default=None
|
|
)
|
|
callbacks: Optional[List[InstanceOf[BaseCallbackHandler]]] = Field(
|
|
default=None, description="Callback to be executed"
|
|
)
|
|
|
|
_original_role: str | None = None
|
|
_original_goal: str | None = None
|
|
_original_backstory: str | None = None
|
|
|
|
def __init__(__pydantic_self__, **data):
|
|
config = data.pop("config", {})
|
|
super().__init__(**config, **data)
|
|
__pydantic_self__.agent_ops_agent_name = __pydantic_self__.role
|
|
|
|
@field_validator("id", mode="before")
|
|
@classmethod
|
|
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
|
|
if v:
|
|
raise PydanticCustomError(
|
|
"may_not_set_field", "This field is not to be set by the user.", {}
|
|
)
|
|
|
|
@model_validator(mode="after")
|
|
def set_attributes_based_on_config(self) -> "Agent":
|
|
"""Set attributes based on the agent configuration."""
|
|
if self.config:
|
|
for key, value in self.config.items():
|
|
setattr(self, key, value)
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def set_private_attrs(self):
|
|
"""Set private attributes."""
|
|
self._logger = Logger(self.verbose)
|
|
if self.max_rpm and not self._rpm_controller:
|
|
self._rpm_controller = RPMController(
|
|
max_rpm=self.max_rpm, logger=self._logger
|
|
)
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def set_agent_executor(self) -> "Agent":
|
|
"""set agent executor is set."""
|
|
if hasattr(self.llm, "model_name"):
|
|
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)
|
|
|
|
# Ensure self.llm.callbacks is a list
|
|
if not isinstance(self.llm.callbacks, list):
|
|
self.llm.callbacks = []
|
|
|
|
# Check if an instance of TokenCalcHandler already exists in the list
|
|
if not any(isinstance(handler, TokenCalcHandler) for handler in self.llm.callbacks):
|
|
self.llm.callbacks.append(token_handler)
|
|
|
|
if not self.agent_executor:
|
|
if not self.cache_handler:
|
|
self.cache_handler = CacheHandler()
|
|
self.set_cache_handler(self.cache_handler)
|
|
return self
|
|
|
|
def execute_task(
|
|
self,
|
|
task: Any,
|
|
context: Optional[str] = None,
|
|
tools: Optional[List[Any]] = None,
|
|
) -> str:
|
|
"""Execute a task with the agent.
|
|
|
|
Args:
|
|
task: Task to execute.
|
|
context: Context to execute the task in.
|
|
tools: Tools to use for the task.
|
|
|
|
Returns:
|
|
Output of the agent
|
|
"""
|
|
if self.tools_handler:
|
|
self.tools_handler.last_used_tool = {}
|
|
|
|
task_prompt = task.prompt()
|
|
|
|
if context:
|
|
task_prompt = self.i18n.slice("task_with_context").format(
|
|
task=task_prompt, context=context
|
|
)
|
|
|
|
if self.crew and self.crew.memory:
|
|
contextual_memory = ContextualMemory(
|
|
self.crew._short_term_memory,
|
|
self.crew._long_term_memory,
|
|
self.crew._entity_memory,
|
|
)
|
|
memory = contextual_memory.build_context_for_task(task, context)
|
|
if memory.strip() != "":
|
|
task_prompt += self.i18n.slice("memory").format(memory=memory)
|
|
|
|
tools = tools or self.tools
|
|
parsed_tools = self._parse_tools(tools)
|
|
|
|
self.create_agent_executor(tools=tools)
|
|
self.agent_executor.tools = parsed_tools
|
|
self.agent_executor.task = task
|
|
|
|
self.agent_executor.tools_description = render_text_description(parsed_tools)
|
|
self.agent_executor.tools_names = self.__tools_names(parsed_tools)
|
|
|
|
result = self.agent_executor.invoke(
|
|
{
|
|
"input": task_prompt,
|
|
"tool_names": self.agent_executor.tools_names,
|
|
"tools": self.agent_executor.tools_description,
|
|
}
|
|
)["output"]
|
|
|
|
if self.max_rpm:
|
|
self._rpm_controller.stop_rpm_counter()
|
|
|
|
return result
|
|
|
|
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
|
|
"""Set the cache handler for the agent.
|
|
|
|
Args:
|
|
cache_handler: An instance of the CacheHandler class.
|
|
"""
|
|
self.tools_handler = ToolsHandler()
|
|
if self.cache:
|
|
self.cache_handler = cache_handler
|
|
self.tools_handler.cache = cache_handler
|
|
self.create_agent_executor()
|
|
|
|
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
|
|
"""Set the rpm controller for the agent.
|
|
|
|
Args:
|
|
rpm_controller: An instance of the RPMController class.
|
|
"""
|
|
if not self._rpm_controller:
|
|
self._rpm_controller = rpm_controller
|
|
self.create_agent_executor()
|
|
|
|
def create_agent_executor(self, tools=None) -> None:
|
|
"""Create an agent executor for the agent.
|
|
|
|
Returns:
|
|
An instance of the CrewAgentExecutor class.
|
|
"""
|
|
tools = tools or self.tools
|
|
|
|
agent_args = {
|
|
"input": lambda x: x["input"],
|
|
"tools": lambda x: x["tools"],
|
|
"tool_names": lambda x: x["tool_names"],
|
|
"agent_scratchpad": lambda x: self.format_log_to_str(
|
|
x["intermediate_steps"]
|
|
),
|
|
}
|
|
|
|
executor_args = {
|
|
"llm": self.llm,
|
|
"i18n": self.i18n,
|
|
"crew": self.crew,
|
|
"crew_agent": self,
|
|
"tools": self._parse_tools(tools),
|
|
"verbose": self.verbose,
|
|
"original_tools": tools,
|
|
"handle_parsing_errors": True,
|
|
"max_iterations": self.max_iter,
|
|
"max_execution_time": self.max_execution_time,
|
|
"step_callback": self.step_callback,
|
|
"tools_handler": self.tools_handler,
|
|
"function_calling_llm": self.function_calling_llm,
|
|
"callbacks": self.callbacks,
|
|
}
|
|
|
|
if self._rpm_controller:
|
|
executor_args[
|
|
"request_within_rpm_limit"
|
|
] = self._rpm_controller.check_or_wait
|
|
|
|
prompt = Prompts(i18n=self.i18n, tools=tools).task_execution()
|
|
|
|
execution_prompt = prompt.partial(
|
|
goal=self.goal,
|
|
role=self.role,
|
|
backstory=self.backstory,
|
|
)
|
|
|
|
bind = self.llm.bind(stop=[self.i18n.slice("observation")])
|
|
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(agent=self)
|
|
self.agent_executor = CrewAgentExecutor(
|
|
agent=RunnableAgent(runnable=inner_agent), **executor_args
|
|
)
|
|
|
|
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
|
"""Interpolate inputs into the agent description and backstory."""
|
|
if self._original_role is None:
|
|
self._original_role = self.role
|
|
if self._original_goal is None:
|
|
self._original_goal = self.goal
|
|
if self._original_backstory is None:
|
|
self._original_backstory = self.backstory
|
|
|
|
if inputs:
|
|
self.role = self._original_role.format(**inputs)
|
|
self.goal = self._original_goal.format(**inputs)
|
|
self.backstory = self._original_backstory.format(**inputs)
|
|
|
|
def increment_formatting_errors(self) -> None:
|
|
"""Count the formatting errors of the agent."""
|
|
self.formatting_errors += 1
|
|
|
|
def format_log_to_str(
|
|
self,
|
|
intermediate_steps: List[Tuple[AgentAction, str]],
|
|
observation_prefix: str = "Observation: ",
|
|
llm_prefix: str = "",
|
|
) -> str:
|
|
"""Construct the scratchpad that lets the agent continue its thought process."""
|
|
thoughts = ""
|
|
for action, observation in intermediate_steps:
|
|
thoughts += action.log
|
|
thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}"
|
|
return thoughts
|
|
|
|
def _parse_tools(self, tools: List[Any]) -> List[LangChainTool]:
|
|
"""Parse tools to be used for the task."""
|
|
# tentatively try to import from crewai_tools import BaseTool as CrewAITool
|
|
tools_list = []
|
|
try:
|
|
from crewai_tools import BaseTool as CrewAITool
|
|
|
|
for tool in tools:
|
|
if isinstance(tool, CrewAITool):
|
|
tools_list.append(tool.to_langchain())
|
|
else:
|
|
tools_list.append(tool)
|
|
except ModuleNotFoundError:
|
|
for tool in tools:
|
|
tools_list.append(tool)
|
|
return tools_list
|
|
|
|
@staticmethod
|
|
def __tools_names(tools) -> str:
|
|
return ", ".join([t.name for t in tools])
|
|
|
|
def __repr__(self):
|
|
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
|