feat: Add Train feature for Crews (#686)

* feat: add training logic to agent and crew

* feat: add training logic to agent executor

* feat: add input parameter  to cli command

* feat: add utilities for the training logic

* feat: polish code, logic and add private variables

* feat: add docstring and type hinting to executor

* feat: add constant file, add constant to code

* feat: fix name of training handler function

* feat: remove unused var

* feat: change file handler file name

* feat: Add training handler file, class and change on the code

* feat: fix name error from file

* fix: change import to adapt to logic

* feat: add training handler test

* feat: add tests for file and training_handler

* feat: add test for task evaluator function

* feat: change text to fit in-screen

* feat: add test for train function

* feat: add test for agent training_handler function

* feat: add test for agent._use_trained_data
This commit is contained in:
Eduardo Chiarotti
2024-06-27 02:22:34 -03:00
committed by GitHub
parent 0594a7f9d8
commit 3573a61568
15 changed files with 564 additions and 45 deletions

View File

@@ -1,6 +1,6 @@
from copy import deepcopy
import os
import uuid
from copy import deepcopy
from typing import Any, Dict, List, Optional, Tuple
from langchain.agents.agent import RunnableAgent
@@ -24,7 +24,9 @@ from pydantic_core import PydanticCustomError
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser, ToolsHandler
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.utilities import I18N, Logger, Prompts, RPMController
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.token_counter_callback import TokenCalcHandler, TokenProcess
from crewai.utilities.training_handler import CrewTrainingHandler
class Agent(BaseModel):
@@ -98,8 +100,7 @@ class Agent(BaseModel):
agent_executor: InstanceOf[CrewAgentExecutor] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
crew: Any = Field(
default=None, description="Crew to which the agent belongs.")
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
@@ -110,8 +111,7 @@ class Agent(BaseModel):
default=None,
description="Callback to be executed after each step of the agent execution.",
)
i18n: I18N = Field(
default=I18N(), description="Internationalization settings.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
llm: Any = Field(
default_factory=lambda: ChatOpenAI(
model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4o")
@@ -172,8 +172,7 @@ class Agent(BaseModel):
def set_agent_executor(self) -> "Agent":
"""set agent executor is set."""
if hasattr(self.llm, "model_name"):
token_handler = TokenCalcHandler(
self.llm.model_name, self._token_process)
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)
# Ensure self.llm.callbacks is a list
if not isinstance(self.llm.callbacks, list):
@@ -236,10 +235,14 @@ class Agent(BaseModel):
self.agent_executor.tools = parsed_tools
self.agent_executor.task = task
self.agent_executor.tools_description = render_text_description(
parsed_tools)
self.agent_executor.tools_description = render_text_description(parsed_tools)
self.agent_executor.tools_names = self.__tools_names(parsed_tools)
if self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
result = self.agent_executor.invoke(
{
"input": task_prompt,
@@ -335,8 +338,7 @@ class Agent(BaseModel):
)
bind = self.llm.bind(stop=stop_words)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(
agent=self)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(agent=self)
self.agent_executor = CrewAgentExecutor(
agent=RunnableAgent(runnable=inner_agent), **executor_args
)
@@ -371,7 +373,7 @@ class Agent(BaseModel):
thoughts += action.log
thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}"
return thoughts
def copy(self):
"""Create a deep copy of the Agent."""
exclude = {
@@ -379,8 +381,8 @@ class Agent(BaseModel):
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"agent_executor",
"_token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
@@ -412,6 +414,30 @@ class Agent(BaseModel):
tools_list.append(tool)
return tools_list
def _training_handler(self, task_prompt: str) -> str:
"""Handle training data for the agent task prompt to improve output on Training."""
if data := CrewTrainingHandler(TRAINING_DATA_FILE).load():
agent_id = str(self.id)
if data.get(agent_id):
human_feedbacks = [
i["human_feedback"] for i in data.get(agent_id, {}).values()
]
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
human_feedbacks
)
return task_prompt
def _use_trained_data(self, task_prompt: str) -> str:
"""Use trained data for the agent task prompt to improve output."""
if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load():
if trained_data_output := data.get(self.role):
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
trained_data_output["suggestions"]
)
return task_prompt
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])

View File

@@ -18,8 +18,10 @@ from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.training_handler import CrewTrainingHandler
class CrewAgentExecutor(AgentExecutor):
@@ -246,12 +248,17 @@ class CrewAgentExecutor(AgentExecutor):
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
if self.should_ask_for_human_input:
human_feedback = self._ask_human_input(output.return_values["output"])
if self.crew._train:
self._handle_crew_training_output(output, human_feedback)
# Making sure we only ask for it once, so disabling for the next thought loop
self.should_ask_for_human_input = False
human_feedback = self._ask_human_input(output.return_values["output"])
action = AgentAction(
tool="Human Input", tool_input=human_feedback, log=output.log
)
yield AgentStep(
action=action,
observation=self._i18n.slice("human_feedback").format(
@@ -261,6 +268,9 @@ class CrewAgentExecutor(AgentExecutor):
return
else:
if self.crew._train:
self._handle_crew_training_output(output)
yield output
return
@@ -305,3 +315,30 @@ class CrewAgentExecutor(AgentExecutor):
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)
def _handle_crew_training_output(
self, output: AgentFinish, human_feedback: str | None = None
) -> None:
"""Function to handle the process of the training data."""
agent_id = str(self.crew_agent.id)
if (
training_data := CrewTrainingHandler(TRAINING_DATA_FILE).load()
and not self.should_ask_for_human_input
):
if training_data.get(agent_id):
training_data[agent_id][self.crew._train_iteration][
"improved_output"
] = output.return_values["output"]
CrewTrainingHandler(TRAINING_DATA_FILE).save(training_data)
if self.should_ask_for_human_input and human_feedback is not None:
training_data = {
"initial_output": output.return_values["output"],
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.crew_agent.role,
}
CrewTrainingHandler(TRAINING_DATA_FILE).append(
self.crew._train_iteration, agent_id, training_data
)

View File

@@ -15,8 +15,9 @@ def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {"topic": "AI LLMs"}
try:
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]))
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]), inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")

View File

@@ -27,6 +27,8 @@ from crewai.task import Task
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.training_handler import CrewTrainingHandler
class Crew(BaseModel):
@@ -63,6 +65,8 @@ class Crew(BaseModel):
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -242,6 +246,35 @@ class Crew(BaseModel):
del task_config["agent"]
return Task(**task_config, agent=task_agent)
def _setup_for_training(self) -> None:
"""Sets up the crew for training."""
self._train = True
for task in self.tasks:
task.human_input = True
for agent in self.agents:
agent.allow_delegation = False
def train(self, n_iterations: int, inputs: Optional[Dict[str, Any]] = {}) -> None:
"""Trains the crew for a given number of iterations."""
self._setup_for_training()
for n_iteration in range(n_iterations):
self._train_iteration = n_iteration
self.kickoff(inputs=inputs)
training_data = CrewTrainingHandler("training_data.pkl").load()
for agent in self.agents:
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)
CrewTrainingHandler("trained_agents_data.pkl").save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = {},
@@ -328,11 +361,7 @@ class Crew(BaseModel):
return results
def train(self, n_iterations: int) -> None:
# TODO: Implement training
pass
def _run_sequential_process(self) -> Union[str, Dict[str, Any]]:
def _run_sequential_process(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = ""
for task in self.tasks:

View File

@@ -1,9 +1,22 @@
from .converter import Converter, ConverterError
from .file_handler import FileHandler
from .i18n import I18N
from .instructor import Instructor
from .logger import Logger
from .parser import YamlParser
from .printer import Printer
from .prompts import Prompts
from .rpm_controller import RPMController
from .fileHandler import FileHandler
from .parser import YamlParser
__all__ = [
"Converter",
"ConverterError",
"FileHandler",
"I18N",
"Instructor",
"Logger",
"Printer",
"Prompts",
"RPMController",
"YamlParser",
]

View File

@@ -0,0 +1,2 @@
TRAINING_DATA_FILE = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE = "trained_agents_data.pkl"

View File

@@ -26,6 +26,18 @@ class TaskEvaluation(BaseModel):
)
class TrainingTaskEvaluation(BaseModel):
suggestions: List[str] = Field(
description="Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks."
)
quality: float = Field(
description="A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback."
)
final_summary: str = Field(
description="A step by step action items to improve the next Agent based on the human-feedback and improved output."
)
class TaskEvaluator:
def __init__(self, original_agent):
self.llm = original_agent.llm
@@ -59,3 +71,49 @@ class TaskEvaluator:
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def evaluate_training_data(
self, training_data: dict, agent_id: str
) -> TrainingTaskEvaluation:
"""
Evaluate the training data based on the llm output, human feedback, and improved output.
Parameters:
- training_data (dict): The training data to be evaluated.
- agent_id (str): The ID of the agent.
"""
output_training_data = training_data[agent_id]
final_aggregated_data = ""
for _, data in output_training_data.items():
final_aggregated_data += (
f"Initial Output:\n{data['initial_output']}\n\n"
f"Human Feedback:\n{data['human_feedback']}\n\n"
f"Improved Output:\n{data['improved_output']}\n\n"
)
evaluation_query = (
"Assess the quality of the training data based on the llm output, human feedback , and llm output improved result.\n\n"
f"{final_aggregated_data}"
"Please provide:\n"
"- Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks\n"
"- A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback\n"
)
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(self.llm):
model_schema = PydanticSchemaParser(
model=TrainingTaskEvaluation
).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=self.llm,
text=evaluation_query,
model=TrainingTaskEvaluation,
instructions=instructions,
)
pydantic_result = converter.to_pydantic()
return pydantic_result

View File

@@ -1,20 +0,0 @@
import os
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding = 'utf-8') as file:
file.write(message + "\n")

View File

@@ -0,0 +1,69 @@
import os
import pickle
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding="utf-8") as file:
file.write(message + "\n")
class PickleHandler:
def __init__(self, file_name: str) -> None:
"""
Initialize the PickleHandler with the name of the file where data will be stored.
The file will be saved in the current directory.
Parameters:
- file_name (str): The name of the file for saving and loading data.
"""
self.file_path = os.path.join(os.getcwd(), file_name)
self._initialize_file()
def _initialize_file(self) -> None:
"""
Initialize the file with an empty dictionary if it does not exist or is empty.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
self.save({}) # Save an empty dictionary to initialize the file
def save(self, data) -> None:
"""
Save the data to the specified file using pickle.
Parameters:
- data (object): The data to be saved.
"""
with open(self.file_path, "wb") as file:
pickle.dump(data, file)
def load(self) -> dict:
"""
Load the data from the specified file using pickle.
Returns:
- dict: The data loaded from the file.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
return {} # Return an empty dictionary if the file does not exist or is empty
with open(self.file_path, "rb") as file:
try:
return pickle.load(file)
except EOFError:
return {} # Return an empty dictionary if the file is empty or corrupted
except Exception:
raise # Raise any other exceptions that occur during loading

View File

@@ -0,0 +1,31 @@
from crewai.utilities.file_handler import PickleHandler
class CrewTrainingHandler(PickleHandler):
def save_trained_data(self, agent_id: str, trained_data: dict) -> None:
"""
Save the trained data for a specific agent.
Parameters:
- agent_id (str): The ID of the agent.
- trained_data (dict): The trained data to be saved.
"""
data = self.load()
data[agent_id] = trained_data
self.save(data)
def append(self, train_iteration: int, agent_id: str, new_data) -> None:
"""
Append new data to the existing pickle file.
Parameters:
- new_data (object): The new data to be appended.
"""
data = self.load()
if agent_id in data:
data[agent_id][train_iteration] = new_data
else:
data[agent_id] = {train_iteration: new_data}
self.save(data)