Lorenzejay/byoa (#776)

* better spacing

* works with llama index

* works on langchain custom just need delegation to work

* cleanup for custom_agent class

* works with different argument expectations for agent_executor

* cleanup for hierarchial process, better agent_executor args handler and added to the crew agent doc page

* removed code examples for langchain + llama index, added to docs instead

* added key output if return is not a str for and added some tests

* added hinting for CustomAgent class

* removed pass as it was not needed

* closer just need to figuire ou agentTools

* running agents - llamaindex and langchain with base agent

* some cleanup on baseAgent

* minimum for agent to run for base class and ensure it works with hierarchical process

* cleanup for original agent to take on BaseAgent class

* Agent takes on langchainagent and cleanup across

* token handling working for usage_metrics to continue working

* installed llama-index, updated docs and added better name

* fixed some type errors

* base agent holds token_process

* heirarchail process uses proper tools and no longer relies on hasattr for token_processes

* removal of test_custom_agent_executions

* this fixes copying agents

* leveraging an executor class for trigger llamaindex agent

* llama index now has ask_human

* executor mixins added

* added output converter base class

* type listed

* cleanup for output conversions and tokenprocess eliminated redundancy

* properly handling tokens

* simplified token calc handling

* original agent with base agent builder structure setup

* better docs

* no more llama-index dep

* cleaner docs

* test fixes

* poetry reverts and better docs

* base_agent_tools set for third party agents

* updated task and test fix
This commit is contained in:
Lorenze Jay
2024-06-27 10:56:08 -07:00
committed by GitHub
parent da9cc5f097
commit 10997dd175
22 changed files with 637 additions and 407 deletions

View File

@@ -0,0 +1,256 @@
from copy import deepcopy
import uuid
from typing import Any, Dict, List, Optional
from abc import ABC, abstractmethod
from pydantic import (
UUID4,
BaseModel,
Field,
InstanceOf,
field_validator,
model_validator,
ConfigDict,
PrivateAttr,
)
from pydantic_core import PydanticCustomError
from crewai.utilities import I18N, RPMController, Logger
from crewai.agents import CacheHandler, ToolsHandler
from crewai.utilities.token_counter_callback import TokenProcess
class BaseAgent(ABC, BaseModel):
"""Abstract Base Class for all third party agents compatible with CrewAI.
Attributes:
id (UUID4): Unique identifier for the agent.
role (str): Role of the agent.
goal (str): Objective of the agent.
backstory (str): Backstory of the agent.
cache (bool): Whether the agent should use a cache for tool usage.
config (Optional[Dict[str, Any]]): Configuration for the agent.
verbose (bool): Verbose mode for the Agent Execution.
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
allow_delegation (bool): Allow delegation of tasks to agents.
tools (Optional[List[Any]]): Tools at the agent's disposal.
max_iter (Optional[int]): Maximum iterations for an agent to execute a task.
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
i18n (I18N): Internationalization settings.
cache_handler (InstanceOf[CacheHandler]): An instance of the CacheHandler class.
tools_handler (InstanceOf[ToolsHandler]): An instance of the ToolsHandler class.
Methods:
execute_task(task: Any, context: Optional[str] = None, tools: Optional[List[Any]] = None) -> str:
Abstract method to execute a task.
create_agent_executor(tools=None) -> None:
Abstract method to create an agent executor.
_parse_tools(tools: List[Any]) -> List[Any]:
Abstract method to parse tools.
get_delegation_tools(agents: List["BaseAgent"]):
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
get_output_converter(llm, model, instructions):
Abstract method to get the converter class for the agent to create json/pydantic outputs.
interpolate_inputs(inputs: Dict[str, Any]) -> None:
Interpolate inputs into the agent description and backstory.
set_cache_handler(cache_handler: CacheHandler) -> None:
Set the cache handler for the agent.
increment_formatting_errors() -> None:
Increment formatting errors.
copy() -> "BaseAgent":
Create a copy of the agent.
set_rpm_controller(rpm_controller: RPMController) -> None:
Set the rpm controller for the agent.
set_private_attrs() -> "BaseAgent":
Set private attributes.
"""
__hash__ = object.__hash__ # type: ignore
_logger: Logger = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
formatting_errors: int = 0
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
cache: bool = Field(
default=True, description="Whether the agent should use a cache for tool usage."
)
config: Optional[Dict[str, Any]] = Field(
description="Configuration for the agent", default=None
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the agent execution to be respected.",
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: Optional[List[Any]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: Optional[int] = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
agent_executor: InstanceOf = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
llm: Any = Field(
default=None, description="Language model that will run the agent."
)
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
cache_handler: InstanceOf[CacheHandler] = Field(
default=None, description="An instance of the CacheHandler class."
)
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
_original_role: str | None = None
_original_goal: str | None = None
_original_backstory: str | None = None
_token_process: TokenProcess = TokenProcess()
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
super().__init__(**config, **data)
@model_validator(mode="after")
def set_config_attributes(self):
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "BaseAgent":
"""Set attributes based on the agent configuration."""
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@model_validator(mode="after")
def set_private_attrs(self):
"""Set private attributes."""
self._logger = Logger(self.verbose)
if self.max_rpm and not self._rpm_controller:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
if not self._token_process:
self._token_process = TokenProcess()
return self
@abstractmethod
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
pass
@abstractmethod
def create_agent_executor(self, tools=None) -> None:
pass
@abstractmethod
def _parse_tools(self, tools: List[Any]) -> List[Any]:
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]):
"""Set the task tools that init BaseAgenTools class."""
pass
@abstractmethod
def get_output_converter(
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
):
"""Get the converter class for the agent to create json/pydantic outputs."""
pass
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
if self._original_goal is None:
self._original_goal = self.goal
if self._original_backstory is None:
self._original_backstory = self.backstory
if inputs:
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.
Args:
cache_handler: An instance of the CacheHandler class.
"""
self.tools_handler = ToolsHandler()
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
def increment_formatting_errors(self) -> None:
print("Formatting errors incremented")
def copy(self):
exclude = {
"id",
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
"crew",
"llm",
}
copied_data = self.model_dump(exclude=exclude, exclude_unset=True)
copied_agent = self.__class__(**copied_data)
# Copy mutable attributes separately
copied_agent.tools = deepcopy(self.tools)
copied_agent.config = deepcopy(self.config)
# Preserve original values for interpolation
copied_agent._original_role = self._original_role
copied_agent._original_goal = self._original_goal
copied_agent._original_backstory = self._original_backstory
return copied_agent
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()

View File

@@ -0,0 +1,65 @@
import time
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
class CrewAgentExecutorMixin:
def _should_force_answer(self) -> bool:
return (
self.iterations == self.force_answer_max_iterations
) and not self.have_forced_answer
def _create_short_term_memory(self, output) -> None:
if (
self.crew
and self.crew.memory
and "Action: Delegate work to coworker" not in output.log
):
memory = ShortTermMemoryItem(
data=output.log,
agent=self.crew_agent.role,
metadata={
"observation": self.task.description,
},
)
self.crew._short_term_memory.save(memory)
def _create_long_term_memory(self, output) -> None:
if self.crew and self.crew.memory:
ltm_agent = TaskEvaluator(self.crew_agent)
evaluation = ltm_agent.evaluate(self.task, output.log)
if isinstance(evaluation, ConverterError):
return
long_term_memory = LongTermMemoryItem(
task=self.task.description,
agent=self.crew_agent.role,
quality=evaluation.quality,
datetime=str(time.time()),
expected_output=self.task.expected_output,
metadata={
"suggestions": evaluation.suggestions,
"quality": evaluation.quality,
},
)
self.crew._long_term_memory.save(long_term_memory)
for entity in evaluation.entities:
entity_memory = EntityMemoryItem(
name=entity.name,
type=entity.type,
description=entity.description,
relationships="\n".join([f"- {r}" for r in entity.relationships]),
)
self.crew._entity_memory.save(entity_memory)
def _ask_human_input(self, final_answer: dict) -> str:
"""Get human input."""
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)

View File

@@ -0,0 +1,81 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Union
from pydantic import BaseModel, Field
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.utilities import I18N
class BaseAgentTools(BaseModel, ABC):
"""Default tools around agent delegation"""
agents: List[BaseAgent] = Field(description="List of agents in this crew.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
@abstractmethod
def tools(self):
pass
def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]:
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
if coworker:
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return coworker
def delegate_work(
self, task: str, context: str, coworker: Optional[str] = None, **kwargs
):
"""Useful to delegate a specific task to a coworker passing all necessary context and names."""
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, task, context)
def ask_question(
self, question: str, context: str, coworker: Optional[str] = None, **kwargs
):
"""Useful to ask a question, opinion or take from a coworker passing all necessary context and names."""
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, question, context)
def _execute(self, agent: Union[str, None], task: str, context: Union[str, None]):
"""Execute the command."""
try:
if agent is None:
agent = ""
# It is important to remove the quotes from the agent name.
# The reason we have to do this is because less-powerful LLM's
# have difficulty producing valid JSON.
# As a result, we end up with invalid JSON that is truncated like this:
# {"task": "....", "coworker": "....
# when it should look like this:
# {"task": "....", "coworker": "...."}
agent_name = agent.casefold().replace('"', "").replace("\n", "")
agent = [
available_agent
for available_agent in self.agents
if available_agent.role.casefold().replace("\n", "") == agent_name
]
except Exception as _:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
if not agent:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
agent = agent[0]
task = Task(
description=task,
agent=agent,
expected_output="Your best answer to your coworker asking you this, accounting for the context shared.",
)
return agent.execute_task(task, context)

View File

@@ -0,0 +1,48 @@
from abc import ABC, abstractmethod
from typing import Any, Optional
from pydantic import BaseModel, Field, PrivateAttr
class OutputConverter(BaseModel, ABC):
"""
Abstract base class for converting task results into structured formats.
This class provides a framework for converting unstructured text into
either Pydantic models or JSON, tailored for specific agent requirements.
It uses a language model to interpret and structure the input text based
on given instructions.
Attributes:
text (str): The input text to be converted.
llm (Any): The language model used for conversion.
model (Any): The target model for structuring the output.
instructions (str): Specific instructions for the conversion process.
max_attempts (int): Maximum number of conversion attempts (default: 3).
"""
_is_gpt: bool = PrivateAttr(default=True)
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
instructions: str = Field(description="Conversion instructions to the LLM.")
max_attemps: Optional[int] = Field(
description="Max number of attemps to try to get the output formated.",
default=3,
)
@abstractmethod
def to_pydantic(self, current_attempt=1):
"""Convert text to pydantic."""
pass
@abstractmethod
def to_json(self, current_attempt=1):
"""Convert text to json."""
pass
@abstractmethod
def _is_gpt(self, llm):
"""Return if llm provided is of gpt from openai."""
pass

View File

@@ -0,0 +1,27 @@
from typing import Any, Dict
class TokenProcess:
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
successful_requests: int = 0
def sum_prompt_tokens(self, tokens: int):
self.prompt_tokens = self.prompt_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_completion_tokens(self, tokens: int):
self.completion_tokens = self.completion_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_successful_requests(self, requests: int):
self.successful_requests = self.successful_requests + requests
def get_summary(self) -> Dict[str, Any]:
return {
"total_tokens": self.total_tokens,
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens,
"successful_requests": self.successful_requests,
}