Move to src dir usage (#99)

This commit is contained in:
Greyson LaLonde
2024-01-10 09:39:36 -05:00
committed by GitHub
parent c601fbdc12
commit d122d90df0
22 changed files with 17 additions and 6 deletions

4
src/crewai/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.process import Process
from crewai.task import Task

181
src/crewai/agent.py Normal file
View File

@@ -0,0 +1,181 @@
import uuid
from typing import Any, List, Optional
from langchain.agents.format_scratchpad import format_log_to_str
from langchain.chat_models import ChatOpenAI
from langchain.memory import ConversationSummaryMemory
from langchain.tools.render import render_text_description
from langchain_core.runnables.config import RunnableConfig
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agents import (
CacheHandler,
CrewAgentExecutor,
CrewAgentOutputParser,
ToolsHandler,
)
from crewai.prompts import Prompts
class Agent(BaseModel):
"""Represents an agent in a system.
Each agent has a role, a goal, a backstory, and an optional language model (llm).
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes:
agent_executor: An instance of the CrewAgentExecutor class.
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
llm: The language model that will run the agent.
memory: Whether the agent should have memory or not.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
"""
__hash__ = object.__hash__
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
llm: Optional[Any] = Field(
default_factory=lambda: ChatOpenAI(
temperature=0.7,
model_name="gpt-4",
),
description="Language model that will run the agent.",
)
memory: bool = Field(
default=True, description="Whether the agent should have memory or not"
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: List[Any] = Field(
default_factory=list, description="Tools at agents disposal"
)
agent_executor: Optional[InstanceOf[CrewAgentExecutor]] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
tools_handler: Optional[InstanceOf[ToolsHandler]] = Field(
default=None, description="An instance of the ToolsHandler class."
)
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
default=CacheHandler(), description="An instance of the CacheHandler class."
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def check_agent_executor(self) -> "Agent":
if not self.agent_executor:
self.set_cache_handler(self.cache_handler)
return self
def execute_task(
self, task: str, context: str = None, tools: List[Any] = None
) -> str:
"""Execute a task with the agent.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
"""
if context:
task = "\n".join(
[task, "\nThis is the context you are working with:", context]
)
tools = tools or self.tools
self.agent_executor.tools = tools
return self.agent_executor.invoke(
{
"input": task,
"tool_names": self.__tools_names(tools),
"tools": render_text_description(tools),
},
RunnableConfig(callbacks=[self.tools_handler]),
)["output"]
def set_cache_handler(self, cache_handler) -> None:
self.cache_handler = cache_handler
self.tools_handler = ToolsHandler(cache=self.cache_handler)
self.__create_agent_executor()
def __create_agent_executor(self) -> CrewAgentExecutor:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
"""
agent_args = {
"input": lambda x: x["input"],
"tools": lambda x: x["tools"],
"tool_names": lambda x: x["tool_names"],
"agent_scratchpad": lambda x: format_log_to_str(x["intermediate_steps"]),
}
executor_args = {
"tools": self.tools,
"verbose": self.verbose,
"handle_parsing_errors": True,
}
if self.memory:
summary_memory = ConversationSummaryMemory(
llm=self.llm, memory_key="chat_history", input_key="input"
)
executor_args["memory"] = summary_memory
agent_args["chat_history"] = lambda x: x["chat_history"]
prompt = Prompts().task_execution_with_memory()
else:
prompt = Prompts().task_execution()
execution_prompt = prompt.partial(
goal=self.goal,
role=self.role,
backstory=self.backstory,
)
bind = self.llm.bind(stop=["\nObservation"])
inner_agent = (
agent_args
| execution_prompt
| bind
| CrewAgentOutputParser(
tools_handler=self.tools_handler, cache=self.cache_handler
)
)
self.agent_executor = CrewAgentExecutor(agent=inner_agent, **executor_args)
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])

View File

@@ -0,0 +1,4 @@
from .cache.cache_handler import CacheHandler
from .executor import CrewAgentExecutor
from .output_parser import CrewAgentOutputParser
from .tools_handler import ToolsHandler

2
src/crewai/agents/cache/__init__.py vendored Normal file
View File

@@ -0,0 +1,2 @@
from .cache_handler import CacheHandler
from .cache_hit import CacheHit

View File

@@ -0,0 +1,20 @@
from typing import Optional
from pydantic import PrivateAttr
class CacheHandler:
"""Callback handler for tool usage."""
_cache: PrivateAttr = {}
def __init__(self):
self._cache = {}
def add(self, tool, input, output):
input = input.strip()
self._cache[f"{tool}-{input}"] = output
def read(self, tool, input) -> Optional[str]:
input = input.strip()
return self._cache.get(f"{tool}-{input}")

18
src/crewai/agents/cache/cache_hit.py vendored Normal file
View File

@@ -0,0 +1,18 @@
from typing import Any
from pydantic import BaseModel, Field
from .cache_handler import CacheHandler
class CacheHit(BaseModel):
"""Cache Hit Object."""
class Config:
arbitrary_types_allowed = True
# Making it Any instead of AgentAction to avoind
# pydantic v1 vs v2 incompatibility, langchain should
# soon be updated to pydantic v2
action: Any = Field(description="Action taken")
cache: CacheHandler = Field(description="Cache Handler for the tool")

View File

@@ -0,0 +1,24 @@
from langchain_core.exceptions import OutputParserException
class TaskRepeatedUsageException(OutputParserException):
"""Exception raised when a task is used twice in a roll."""
error: str = "TaskRepeatedUsageException"
message: str = "I just used the {tool} tool with input {tool_input}. So I already know the result of that and don't need to use it now.\n"
def __init__(self, tool: str, tool_input: str, text: str):
self.text = text
self.tool = tool
self.tool_input = tool_input
self.message = self.message.format(tool=tool, tool_input=tool_input)
super().__init__(
error=self.error,
observation=self.message,
send_to_llm=True,
llm_output=self.text,
)
def __str__(self):
return self.message

View File

@@ -0,0 +1,126 @@
from typing import Dict, Iterator, List, Optional, Tuple, Union
from langchain.agents import AgentExecutor
from langchain.agents.agent import ExceptionTool
from langchain.agents.tools import InvalidTool
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.exceptions import OutputParserException
from langchain_core.tools import BaseTool
from ..tools.cache_tools import CacheTools
from .cache.cache_hit import CacheHit
class CrewAgentExecutor(AgentExecutor):
def _iter_next_step(
self,
name_to_tool_map: Dict[str, BaseTool],
color_mapping: Dict[str, str],
inputs: Dict[str, str],
intermediate_steps: List[Tuple[AgentAction, str]],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]:
"""Take a single step in the thought-action-observation loop.
Override this to take control of how the agent makes and acts on choices.
"""
try:
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps)
# Call the LLM to see what to do.
output = self.agent.plan(
intermediate_steps,
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
except OutputParserException as e:
if isinstance(self.handle_parsing_errors, bool):
raise_error = not self.handle_parsing_errors
else:
raise_error = False
if raise_error:
raise ValueError(
"An output parsing error occurred. "
"In order to pass this error back to the agent and have it try "
"again, pass `handle_parsing_errors=True` to the AgentExecutor. "
f"This is the error: {str(e)}"
)
text = str(e)
if isinstance(self.handle_parsing_errors, bool):
if e.send_to_llm:
observation = str(e.observation)
text = str(e.llm_output)
else:
observation = "Invalid or incomplete response"
elif isinstance(self.handle_parsing_errors, str):
observation = self.handle_parsing_errors
elif callable(self.handle_parsing_errors):
observation = self.handle_parsing_errors(e)
else:
raise ValueError("Got unexpected type of `handle_parsing_errors`")
output = AgentAction("_Exception", observation, text)
if run_manager:
run_manager.on_agent_action(output, color="green")
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = ExceptionTool().run(
output.tool_input,
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
yield AgentStep(action=output, observation=observation)
return
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
yield output
return
# Override tool usage to use CacheTools
if isinstance(output, CacheHit):
cache = output.cache
action = output.action
tool = CacheTools(cache_handler=cache).tool()
output = action.copy()
output.tool_input = f"tool:{action.tool}|input:{action.tool_input}"
output.tool = tool.name
name_to_tool_map[tool.name] = tool
color_mapping[tool.name] = color_mapping[action.tool]
actions: List[AgentAction]
actions = [output] if isinstance(output, AgentAction) else output
yield from actions
for agent_action in actions:
if run_manager:
run_manager.on_agent_action(agent_action, color="green")
# Otherwise we lookup the tool
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# We then call the tool on the tool input to get an observation
observation = tool.run(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = InvalidTool().run(
{
"requested_tool_name": agent_action.tool,
"available_tool_names": list(name_to_tool_map.keys()),
},
verbose=self.verbose,
color=None,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
yield AgentStep(action=agent_action, observation=observation)

View File

@@ -0,0 +1,75 @@
import re
from typing import Union
from langchain.agents.output_parsers import ReActSingleInputOutputParser
from langchain_core.agents import AgentAction, AgentFinish
from .cache import CacheHandler, CacheHit
from .exceptions import TaskRepeatedUsageException
from .tools_handler import ToolsHandler
FINAL_ANSWER_ACTION = "Final Answer:"
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE = (
"Parsing LLM output produced both a final answer and a parse-able action:"
)
class CrewAgentOutputParser(ReActSingleInputOutputParser):
"""Parses ReAct-style LLM calls that have a single tool input.
Expects output to be in one of two formats.
If the output signals that an action should be taken,
should be in the below format. This will result in an AgentAction
being returned.
```
Thought: agent thought here
Action: search
Action Input: what is the temperature in SF?
```
If the output signals that a final answer should be given,
should be in the below format. This will result in an AgentFinish
being returned.
```
Thought: agent thought here
Final Answer: The temperature is 100 degrees
```
It also prevents tools from being reused in a roll.
"""
class Config:
arbitrary_types_allowed = True
tools_handler: ToolsHandler
cache: CacheHandler
def parse(self, text: str) -> Union[AgentAction, AgentFinish, CacheHit]:
FINAL_ANSWER_ACTION in text
regex = (
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
)
if action_match := re.search(regex, text, re.DOTALL):
action = action_match.group(1).strip()
action_input = action_match.group(2)
tool_input = action_input.strip(" ")
tool_input = tool_input.strip('"')
if last_tool_usage := self.tools_handler.last_used_tool:
usage = {
"tool": action,
"input": tool_input,
}
if usage == last_tool_usage:
raise TaskRepeatedUsageException(
tool=action, tool_input=tool_input, text=text
)
if result := self.cache.read(action, tool_input):
action = AgentAction(action, tool_input, text)
return CacheHit(action=action, cache=self.cache)
return super().parse(text)

View File

@@ -0,0 +1,44 @@
from typing import Any, Dict
from langchain.callbacks.base import BaseCallbackHandler
from ..tools.cache_tools import CacheTools
from .cache.cache_handler import CacheHandler
class ToolsHandler(BaseCallbackHandler):
"""Callback handler for tool usage."""
last_used_tool: Dict[str, Any] = {}
cache: CacheHandler = None
def __init__(self, cache: CacheHandler = None, **kwargs: Any):
"""Initialize the callback handler."""
self.cache = cache
super().__init__(**kwargs)
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
"""Run when tool starts running."""
name = serialized.get("name")
if name not in ["invalid_tool", "_Exception"]:
tools_usage = {
"tool": name,
"input": input_str,
}
self.last_used_tool = tools_usage
def on_tool_end(self, output: str, **kwargs: Any) -> Any:
"""Run when tool ends running."""
if (
"is not a valid tool" not in output
and "Invalid or incomplete response" not in output
and "Invalid Format" not in output
):
if self.last_used_tool["tool"] != CacheTools().name:
self.cache.add(
tool=self.last_used_tool["tool"],
input=self.last_used_tool["input"],
output=output,
)

132
src/crewai/crew.py Normal file
View File

@@ -0,0 +1,132 @@
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
Json,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.process import Process
from crewai.task import Task
from crewai.tools.agent_tools import AgentTools
class Crew(BaseModel):
"""
Represents a group of agents, defining how they should collaborate and the tasks they should perform.
Attributes:
tasks: List of tasks assigned to the crew.
agents: List of agents part of this crew.
process: The process flow that the crew will follow (e.g., sequential).
verbose: Indicates the verbosity level for logging during execution.
config: Configuration settings for the crew.
cache_handler: Handles caching for the crew's operations.
id: A unique identifier for the crew instance.
"""
__hash__ = object.__hash__
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[Agent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: Union[int, bool] = Field(default=0)
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(default=CacheHandler())
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
"""Prevent manual setting of the 'id' field by users."""
if v:
raise PydanticCustomError(
"may_not_set_field", "The 'id' field cannot be set by the user.", {}
)
@classmethod
@field_validator("config", mode="before")
def check_config_type(cls, v: Union[Json, Dict[str, Any]]):
return json.loads(v) if isinstance(v, Json) else v
@model_validator(mode="after")
def check_config(self):
"""Validates that the crew is properly configured with agents and tasks."""
if not self.config and not self.tasks and not self.agents:
raise PydanticCustomError(
"missing_keys",
"Either 'agents' and 'tasks' need to be set or 'config'.",
{},
)
if self.config:
self._setup_from_config()
if self.agents:
for agent in self.agents:
agent.set_cache_handler(self.cache_handler)
return self
def _setup_from_config(self):
"""Initializes agents and tasks from the provided config."""
if not self.config.get("agents") or not self.config.get("tasks"):
raise PydanticCustomError(
"missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {}
)
self.agents = [Agent(**agent) for agent in self.config["agents"]]
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
def _create_task(self, task_config):
"""Creates a task instance from its configuration."""
task_agent = next(
agt for agt in self.agents if agt.role == task_config["agent"]
)
del task_config["agent"]
return Task(**task_config, agent=task_agent)
def kickoff(self) -> str:
"""Starts the crew to work on its assigned tasks."""
for agent in self.agents:
agent.cache_handler = self.cache_handler
if self.process == Process.sequential:
return self._sequential_loop()
def _sequential_loop(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = None
for task in self.tasks:
self._prepare_and_execute_task(task)
task_output = task.execute(task_output)
self._log(
"debug", f"\n\n[{task.agent.role}] Task output: {task_output}\n\n"
)
return task_output
def _prepare_and_execute_task(self, task):
"""Prepares and logs information about the task being executed."""
if task.agent.allow_delegation:
task.tools += AgentTools(agents=self.agents).tools()
self._log("debug", f"Working Agent: {task.agent.role}")
self._log("info", f"Starting Task: {task.description}")
def _log(self, level, message):
"""Logs a message at the specified verbosity level."""
level_map = {"debug": 1, "info": 2}
verbose_level = (
2 if isinstance(self.verbose, bool) and self.verbose else self.verbose
)
if verbose_level and level_map[level] <= verbose_level:
print(message)

11
src/crewai/process.py Normal file
View File

@@ -0,0 +1,11 @@
from enum import Enum
class Process(str, Enum):
"""
Class representing the different processes that can be used to tackle tasks
"""
sequential = "sequential"
# TODO: consensual = 'consensual'
# TODO: hierarchical = 'hierarchical'

57
src/crewai/prompts.py Normal file
View File

@@ -0,0 +1,57 @@
import json
import os
from typing import ClassVar, Dict, Optional
from langchain.prompts import PromptTemplate
from pydantic import BaseModel, Field, PrivateAttr, ValidationError, model_validator
class Prompts(BaseModel):
"""Manages and generates prompts for a generic agent with support for different languages."""
_prompts: Optional[Dict[str, str]] = PrivateAttr()
language: Optional[str] = Field(
default="en",
description="Language of the prompts.",
)
@model_validator(mode="after")
def load_prompts(self) -> "Prompts":
"""Load prompts from a JSON file based on the specified language."""
try:
dir_path = os.path.dirname(os.path.realpath(__file__))
prompts_path = os.path.join(dir_path, f"prompts/{self.language}.json")
with open(prompts_path, "r") as f:
self._prompts = json.load(f)["slices"]
except FileNotFoundError:
raise ValidationError(
f"Prompt file for language '{self.language}' not found."
)
except json.JSONDecodeError:
raise ValidationError(f"Error decoding JSON from the prompts file.")
return self
SCRATCHPAD_SLICE: ClassVar[str] = "\n{agent_scratchpad}"
def task_execution_with_memory(self) -> str:
"""Generate a prompt for task execution with memory components."""
return self._build_prompt(["role_playing", "tools", "memory", "task"])
def task_execution_without_tools(self) -> str:
"""Generate a prompt for task execution without tools components."""
return self._build_prompt(["role_playing", "task"])
def task_execution(self) -> str:
"""Generate a standard prompt for task execution."""
return self._build_prompt(["role_playing", "tools", "task"])
def _build_prompt(self, components: [str]) -> str:
"""Constructs a prompt string from specified components."""
prompt_parts = [
self._prompts[component]
for component in components
if component in self._prompts
]
prompt_parts.append(self.SCRATCHPAD_SLICE)
return PromptTemplate.from_template("".join(prompt_parts))

View File

@@ -0,0 +1,8 @@
{
"slices": {
"task": "Begin! This is VERY important to you, your job depends on it!\n\nCurrent Task: {input}",
"memory": "This is the summary of your work so far:\n{chat_history}",
"role_playing": "You are {role}.\n{backstory}\n\nYour personal goal is: {goal}",
"tools": "TOOLS:\n------\nYou have access to the following tools:\n\n{tools}\n\nTo use a tool, please use the exact following format:\n\n```\nThought: Do I need to use a tool? Yes\nAction: the action to take, should be one of [{tool_names}], just the name.\nAction Input: the input to the action\nObservation: the result of the action\n```\n\nWhen you have a response for your task, or if you do not need to use a tool, you MUST use the format:\n\n```\nThought: Do I need to use a tool? No\nFinal Answer: [your response here]"
}
}

61
src/crewai/task.py Normal file
View File

@@ -0,0 +1,61 @@
import uuid
from typing import Any, List, Optional
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.tasks.task_output import TaskOutput
class Task(BaseModel):
"""Class that represent a task to be executed."""
__hash__ = object.__hash__
description: str = Field(description="Description of the actual task.")
agent: Optional[Agent] = Field(
description="Agent responsible for the task.", default=None
)
tools: List[Any] = Field(
default_factory=list,
description="Tools the agent are limited to use for this task.",
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result.", default=None
)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def check_tools(self):
if not self.tools and (self.agent and self.agent.tools):
self.tools.extend(self.agent.tools)
return self
def execute(self, context: str = None) -> str:
"""Execute the task.
Returns:
Output of the task.
"""
if not self.agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, either consensual or hierarchical."
)
result = self.agent.execute_task(
task=self.description, context=context, tools=self.tools
)
self.output = TaskOutput(description=self.description, result=result)
return result

View File

View File

@@ -0,0 +1,17 @@
from typing import Optional
from pydantic import BaseModel, Field, model_validator
class TaskOutput(BaseModel):
"""Class that represents the result of a task."""
description: str = Field(description="Description of the task")
summary: Optional[str] = Field(description="Summary of the task", default=None)
result: str = Field(description="Result of the task")
@model_validator(mode="after")
def set_summary(self):
excerpt = " ".join(self.description.split(" ")[:10])
self.summary = f"{excerpt}..."
return self

View File

View File

@@ -0,0 +1,75 @@
from textwrap import dedent
from typing import List
from langchain.tools import Tool
from pydantic import BaseModel, Field
from crewai.agent import Agent
class AgentTools(BaseModel):
"""Default tools around agent delegation"""
agents: List[Agent] = Field(description="List of agents in this crew.")
def tools(self):
return [
Tool.from_function(
func=self.delegate_work,
name="Delegate work to co-worker",
description=dedent(
f"""\
Useful to delegate a specific task to one of the
following co-workers: [{', '.join([agent.role for agent in self.agents])}].
The input to this tool should be a pipe (|) separated text of length
three, representing the co-worker you want to ask it to (one of the options),
the task and all actual context you have for the task.
For example, `coworker|task|context`.
"""
),
),
Tool.from_function(
func=self.ask_question,
name="Ask question to co-worker",
description=dedent(
f"""\
Useful to ask a question, opinion or take from on
of the following co-workers: [{', '.join([agent.role for agent in self.agents])}].
The input to this tool should be a pipe (|) separated text of length
three, representing the co-worker you want to ask it to (one of the options),
the question and all actual context you have for the question.
For example, `coworker|question|context`.
"""
),
),
]
def delegate_work(self, command):
"""Useful to delegate a specific task to a coworker."""
return self.__execute(command)
def ask_question(self, command):
"""Useful to ask a question, opinion or take from a coworker."""
return self.__execute(command)
def __execute(self, command):
"""Execute the command."""
try:
agent, task, context = command.split("|")
except ValueError:
return "\nError executing tool. Missing exact 3 pipe (|) separated values. For example, `coworker|task|context`. I need to make sure to pass context as context\n"
if not agent or not task or not context:
return "\nError executing tool. Missing exact 3 pipe (|) separated values. For example, `coworker|task|context`. I need to make sure to pass context as context.\n"
agent = [
available_agent
for available_agent in self.agents
if available_agent.role == agent
]
if not agent:
return f"\nError executing tool. Co-worker mentioned on the Action Input not found, it must to be one of the following options: {', '.join([agent.role for agent in self.agents])}.\n"
agent = agent[0]
return agent.execute_task(task, context)

View File

@@ -0,0 +1,28 @@
from langchain.tools import Tool
from pydantic import BaseModel, ConfigDict, Field
from crewai.agents.cache import CacheHandler
class CacheTools(BaseModel):
"""Default tools to hit the cache."""
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str = "Hit Cache"
cache_handler: CacheHandler = Field(
description="Cache Handler for the crew",
default=CacheHandler(),
)
def tool(self):
return Tool.from_function(
func=self.hit_cache,
name=self.name,
description="Reads directly from the cache",
)
def hit_cache(self, key):
split = key.split("tool:")
tool = split[1].split("|input:")[0].strip()
tool_input = split[1].split("|input:")[1].strip()
return self.cache_handler.read(tool, tool_input)