mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 15:48:29 +00:00
75 lines
2.6 KiB
Python
75 lines
2.6 KiB
Python
from textwrap import dedent
|
|
from typing import List
|
|
|
|
from langchain.tools import Tool
|
|
from pydantic.v1 import BaseModel, Field
|
|
|
|
from ..agent import Agent
|
|
|
|
|
|
class AgentTools(BaseModel):
|
|
"""Tools for generic agent."""
|
|
|
|
agents: List[Agent] = Field(description="List of agents in this crew.")
|
|
|
|
def tools(self):
|
|
return [
|
|
Tool.from_function(
|
|
func=self.delegate_work,
|
|
name="Delegate Work to Co-Worker",
|
|
description=dedent(
|
|
f"""Useful to delegate a specific task to one of the
|
|
following co-workers: [{', '.join([agent.role for agent in self.agents])}].
|
|
The input to this tool should be a pipe (|) separated text of length
|
|
three, representing the role you want to delegate it to, the task and
|
|
information necessary. For example, `coworker|task|information`.
|
|
"""
|
|
),
|
|
),
|
|
Tool.from_function(
|
|
func=self.ask_question,
|
|
name="Ask Question to Co-Worker",
|
|
description=dedent(
|
|
f"""Useful to ask a question, opinion or take from on
|
|
of the following co-workers: [{', '.join([agent.role for agent in self.agents])}].
|
|
The input to this tool should be a pipe (|) separated text of length
|
|
three, representing the role you want to ask it to, the question and
|
|
information necessary. For example, `coworker|question|information`.
|
|
"""
|
|
),
|
|
),
|
|
]
|
|
|
|
def delegate_work(self, command):
|
|
"""Useful to delegate a specific task to a coworker."""
|
|
return self.__execute(command)
|
|
|
|
def ask_question(self, command):
|
|
"""Useful to ask a question, opinion or take from a coworker."""
|
|
return self.__execute(command)
|
|
|
|
def __execute(self, command):
|
|
"""Execute the command."""
|
|
try:
|
|
agent, task, information = command.split("|")
|
|
except ValueError:
|
|
return "Error executing tool. Missing exact 3 pipe (|) separated values. For example, `coworker|task|information`."
|
|
|
|
if not agent or not task or not information:
|
|
return "Error executing tool. Missing exact 3 pipe (|) separated values. For example, `coworker|question|information`."
|
|
|
|
agent = [
|
|
available_agent
|
|
for available_agent in self.agents
|
|
if available_agent.role == agent
|
|
]
|
|
|
|
if len(agent) == 0:
|
|
return (
|
|
"Error executing tool. Co-worker not found, double check the co-worker."
|
|
)
|
|
|
|
agent = agent[0]
|
|
result = agent.execute_task(task, information)
|
|
return result
|