feat: Add Train feature for Crews (#686)

* feat: add training logic to agent and crew

* feat: add training logic to agent executor

* feat: add input parameter  to cli command

* feat: add utilities for the training logic

* feat: polish code, logic and add private variables

* feat: add docstring and type hinting to executor

* feat: add constant file, add constant to code

* feat: fix name of training handler function

* feat: remove unused var

* feat: change file handler file name

* feat: Add training handler file, class and change on the code

* feat: fix name error from file

* fix: change import to adapt to logic

* feat: add training handler test

* feat: add tests for file and training_handler

* feat: add test for task evaluator function

* feat: change text to fit in-screen

* feat: add test for train function

* feat: add test for agent training_handler function

* feat: add test for agent._use_trained_data
This commit is contained in:
Eduardo Chiarotti
2024-06-27 02:22:34 -03:00
committed by GitHub
parent 9e61b8325b
commit 175d5b3dd6
15 changed files with 564 additions and 45 deletions

View File

@@ -1,6 +1,6 @@
from copy import deepcopy
import os
import uuid
from copy import deepcopy
from typing import Any, Dict, List, Optional, Tuple
from langchain.agents.agent import RunnableAgent
@@ -24,7 +24,9 @@ from pydantic_core import PydanticCustomError
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser, ToolsHandler
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.utilities import I18N, Logger, Prompts, RPMController
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.token_counter_callback import TokenCalcHandler, TokenProcess
from crewai.utilities.training_handler import CrewTrainingHandler
class Agent(BaseModel):
@@ -98,8 +100,7 @@ class Agent(BaseModel):
agent_executor: InstanceOf[CrewAgentExecutor] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
crew: Any = Field(
default=None, description="Crew to which the agent belongs.")
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
@@ -110,8 +111,7 @@ class Agent(BaseModel):
default=None,
description="Callback to be executed after each step of the agent execution.",
)
i18n: I18N = Field(
default=I18N(), description="Internationalization settings.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
llm: Any = Field(
default_factory=lambda: ChatOpenAI(
model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4o")
@@ -172,8 +172,7 @@ class Agent(BaseModel):
def set_agent_executor(self) -> "Agent":
"""set agent executor is set."""
if hasattr(self.llm, "model_name"):
token_handler = TokenCalcHandler(
self.llm.model_name, self._token_process)
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)
# Ensure self.llm.callbacks is a list
if not isinstance(self.llm.callbacks, list):
@@ -236,10 +235,14 @@ class Agent(BaseModel):
self.agent_executor.tools = parsed_tools
self.agent_executor.task = task
self.agent_executor.tools_description = render_text_description(
parsed_tools)
self.agent_executor.tools_description = render_text_description(parsed_tools)
self.agent_executor.tools_names = self.__tools_names(parsed_tools)
if self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
result = self.agent_executor.invoke(
{
"input": task_prompt,
@@ -335,8 +338,7 @@ class Agent(BaseModel):
)
bind = self.llm.bind(stop=stop_words)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(
agent=self)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(agent=self)
self.agent_executor = CrewAgentExecutor(
agent=RunnableAgent(runnable=inner_agent), **executor_args
)
@@ -412,6 +414,30 @@ class Agent(BaseModel):
tools_list.append(tool)
return tools_list
def _training_handler(self, task_prompt: str) -> str:
"""Handle training data for the agent task prompt to improve output on Training."""
if data := CrewTrainingHandler(TRAINING_DATA_FILE).load():
agent_id = str(self.id)
if data.get(agent_id):
human_feedbacks = [
i["human_feedback"] for i in data.get(agent_id, {}).values()
]
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
human_feedbacks
)
return task_prompt
def _use_trained_data(self, task_prompt: str) -> str:
"""Use trained data for the agent task prompt to improve output."""
if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load():
if trained_data_output := data.get(self.role):
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
trained_data_output["suggestions"]
)
return task_prompt
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])

View File

@@ -18,8 +18,10 @@ from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.training_handler import CrewTrainingHandler
class CrewAgentExecutor(AgentExecutor):
@@ -246,12 +248,17 @@ class CrewAgentExecutor(AgentExecutor):
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
if self.should_ask_for_human_input:
human_feedback = self._ask_human_input(output.return_values["output"])
if self.crew._train:
self._handle_crew_training_output(output, human_feedback)
# Making sure we only ask for it once, so disabling for the next thought loop
self.should_ask_for_human_input = False
human_feedback = self._ask_human_input(output.return_values["output"])
action = AgentAction(
tool="Human Input", tool_input=human_feedback, log=output.log
)
yield AgentStep(
action=action,
observation=self._i18n.slice("human_feedback").format(
@@ -261,6 +268,9 @@ class CrewAgentExecutor(AgentExecutor):
return
else:
if self.crew._train:
self._handle_crew_training_output(output)
yield output
return
@@ -305,3 +315,30 @@ class CrewAgentExecutor(AgentExecutor):
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)
def _handle_crew_training_output(
self, output: AgentFinish, human_feedback: str | None = None
) -> None:
"""Function to handle the process of the training data."""
agent_id = str(self.crew_agent.id)
if (
training_data := CrewTrainingHandler(TRAINING_DATA_FILE).load()
and not self.should_ask_for_human_input
):
if training_data.get(agent_id):
training_data[agent_id][self.crew._train_iteration][
"improved_output"
] = output.return_values["output"]
CrewTrainingHandler(TRAINING_DATA_FILE).save(training_data)
if self.should_ask_for_human_input and human_feedback is not None:
training_data = {
"initial_output": output.return_values["output"],
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.crew_agent.role,
}
CrewTrainingHandler(TRAINING_DATA_FILE).append(
self.crew._train_iteration, agent_id, training_data
)

View File

@@ -15,8 +15,9 @@ def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {"topic": "AI LLMs"}
try:
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]))
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]), inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")

View File

@@ -27,6 +27,8 @@ from crewai.task import Task
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.training_handler import CrewTrainingHandler
class Crew(BaseModel):
@@ -63,6 +65,8 @@ class Crew(BaseModel):
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -242,6 +246,35 @@ class Crew(BaseModel):
del task_config["agent"]
return Task(**task_config, agent=task_agent)
def _setup_for_training(self) -> None:
"""Sets up the crew for training."""
self._train = True
for task in self.tasks:
task.human_input = True
for agent in self.agents:
agent.allow_delegation = False
def train(self, n_iterations: int, inputs: Optional[Dict[str, Any]] = {}) -> None:
"""Trains the crew for a given number of iterations."""
self._setup_for_training()
for n_iteration in range(n_iterations):
self._train_iteration = n_iteration
self.kickoff(inputs=inputs)
training_data = CrewTrainingHandler("training_data.pkl").load()
for agent in self.agents:
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)
CrewTrainingHandler("trained_agents_data.pkl").save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = {},
@@ -328,11 +361,7 @@ class Crew(BaseModel):
return results
def train(self, n_iterations: int) -> None:
# TODO: Implement training
pass
def _run_sequential_process(self) -> Union[str, Dict[str, Any]]:
def _run_sequential_process(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = ""
for task in self.tasks:

View File

@@ -1,9 +1,22 @@
from .converter import Converter, ConverterError
from .file_handler import FileHandler
from .i18n import I18N
from .instructor import Instructor
from .logger import Logger
from .parser import YamlParser
from .printer import Printer
from .prompts import Prompts
from .rpm_controller import RPMController
from .fileHandler import FileHandler
from .parser import YamlParser
__all__ = [
"Converter",
"ConverterError",
"FileHandler",
"I18N",
"Instructor",
"Logger",
"Printer",
"Prompts",
"RPMController",
"YamlParser",
]

View File

@@ -0,0 +1,2 @@
TRAINING_DATA_FILE = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE = "trained_agents_data.pkl"

View File

@@ -26,6 +26,18 @@ class TaskEvaluation(BaseModel):
)
class TrainingTaskEvaluation(BaseModel):
suggestions: List[str] = Field(
description="Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks."
)
quality: float = Field(
description="A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback."
)
final_summary: str = Field(
description="A step by step action items to improve the next Agent based on the human-feedback and improved output."
)
class TaskEvaluator:
def __init__(self, original_agent):
self.llm = original_agent.llm
@@ -59,3 +71,49 @@ class TaskEvaluator:
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def evaluate_training_data(
self, training_data: dict, agent_id: str
) -> TrainingTaskEvaluation:
"""
Evaluate the training data based on the llm output, human feedback, and improved output.
Parameters:
- training_data (dict): The training data to be evaluated.
- agent_id (str): The ID of the agent.
"""
output_training_data = training_data[agent_id]
final_aggregated_data = ""
for _, data in output_training_data.items():
final_aggregated_data += (
f"Initial Output:\n{data['initial_output']}\n\n"
f"Human Feedback:\n{data['human_feedback']}\n\n"
f"Improved Output:\n{data['improved_output']}\n\n"
)
evaluation_query = (
"Assess the quality of the training data based on the llm output, human feedback , and llm output improved result.\n\n"
f"{final_aggregated_data}"
"Please provide:\n"
"- Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks\n"
"- A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback\n"
)
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(self.llm):
model_schema = PydanticSchemaParser(
model=TrainingTaskEvaluation
).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=self.llm,
text=evaluation_query,
model=TrainingTaskEvaluation,
instructions=instructions,
)
pydantic_result = converter.to_pydantic()
return pydantic_result

View File

@@ -1,20 +0,0 @@
import os
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding = 'utf-8') as file:
file.write(message + "\n")

View File

@@ -0,0 +1,69 @@
import os
import pickle
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding="utf-8") as file:
file.write(message + "\n")
class PickleHandler:
def __init__(self, file_name: str) -> None:
"""
Initialize the PickleHandler with the name of the file where data will be stored.
The file will be saved in the current directory.
Parameters:
- file_name (str): The name of the file for saving and loading data.
"""
self.file_path = os.path.join(os.getcwd(), file_name)
self._initialize_file()
def _initialize_file(self) -> None:
"""
Initialize the file with an empty dictionary if it does not exist or is empty.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
self.save({}) # Save an empty dictionary to initialize the file
def save(self, data) -> None:
"""
Save the data to the specified file using pickle.
Parameters:
- data (object): The data to be saved.
"""
with open(self.file_path, "wb") as file:
pickle.dump(data, file)
def load(self) -> dict:
"""
Load the data from the specified file using pickle.
Returns:
- dict: The data loaded from the file.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
return {} # Return an empty dictionary if the file does not exist or is empty
with open(self.file_path, "rb") as file:
try:
return pickle.load(file)
except EOFError:
return {} # Return an empty dictionary if the file is empty or corrupted
except Exception:
raise # Raise any other exceptions that occur during loading

View File

@@ -0,0 +1,31 @@
from crewai.utilities.file_handler import PickleHandler
class CrewTrainingHandler(PickleHandler):
def save_trained_data(self, agent_id: str, trained_data: dict) -> None:
"""
Save the trained data for a specific agent.
Parameters:
- agent_id (str): The ID of the agent.
- trained_data (dict): The trained data to be saved.
"""
data = self.load()
data[agent_id] = trained_data
self.save(data)
def append(self, train_iteration: int, agent_id: str, new_data) -> None:
"""
Append new data to the existing pickle file.
Parameters:
- new_data (object): The new data to be appended.
"""
data = self.load()
if agent_id in data:
data[agent_id][train_iteration] = new_data
else:
data[agent_id] = {train_iteration: new_data}
self.save(data)

View File

@@ -1,5 +1,6 @@
"""Test Agent creation and execution basic functionality."""
from unittest import mock
from unittest.mock import patch
import pytest
@@ -842,3 +843,54 @@ Thought:
"""
)
@patch("crewai.agent.CrewTrainingHandler")
def test_agent_training_handler(crew_training_handler):
task_prompt = "What is 1 + 1?"
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
verbose=True,
)
crew_training_handler().load.return_value = {
f"{str(agent.id)}": {"0": {"human_feedback": "good"}}
}
result = agent._training_handler(task_prompt=task_prompt)
assert result == "What is 1 + 1?You MUST follow these feedbacks: \n good"
crew_training_handler.assert_has_calls(
[mock.call(), mock.call("training_data.pkl"), mock.call().load()]
)
@patch("crewai.agent.CrewTrainingHandler")
def test_agent_use_trained_data(crew_training_handler):
task_prompt = "What is 1 + 1?"
agent = Agent(
role="researcher",
goal="test goal",
backstory="test backstory",
verbose=True,
)
crew_training_handler().load.return_value = {
agent.role: {
"suggestions": [
"The result of the math operatio must be right.",
"Result must be better than 1.",
]
}
}
result = agent._use_trained_data(task_prompt=task_prompt)
assert (
result == "What is 1 + 1?You MUST follow these feedbacks: \n "
"The result of the math operatio must be right.\n - Result must be better than 1."
)
crew_training_handler.assert_has_calls(
[mock.call(), mock.call("trained_agents_data.pkl"), mock.call().load()]
)

View File

@@ -1,6 +1,8 @@
"""Test Agent creation and execution basic functionality."""
import json
from unittest import mock
from unittest.mock import patch
import pydantic_core
import pytest
@@ -1006,7 +1008,10 @@ def test_manager_agent_with_tools_raises_exception():
crew.kickoff()
def test_crew_train_success():
@patch("crewai.crew.Crew.kickoff")
@patch("crewai.crew.CrewTrainingHandler")
@patch("crewai.crew.TaskEvaluator")
def test_crew_train_success(task_evaluator, crew_training_handler, kickoff):
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
@@ -1016,8 +1021,48 @@ def test_crew_train_success():
agents=[researcher, writer],
tasks=[task],
)
crew.train(n_iterations=2, inputs={"topic": "AI"})
task_evaluator.assert_has_calls(
[
mock.call(researcher),
mock.call().evaluate_training_data(
training_data=crew_training_handler().load(),
agent_id=str(researcher.id),
),
mock.call().evaluate_training_data().model_dump(),
mock.call(writer),
mock.call().evaluate_training_data(
training_data=crew_training_handler().load(),
agent_id=str(writer.id),
),
mock.call().evaluate_training_data().model_dump(),
]
)
crew.train(n_iterations=2)
crew_training_handler.assert_has_calls(
[
mock.call("training_data.pkl"),
mock.call().load(),
mock.call("trained_agents_data.pkl"),
mock.call().save_trained_data(
agent_id="Researcher",
trained_data=task_evaluator().evaluate_training_data().model_dump(),
),
mock.call("trained_agents_data.pkl"),
mock.call().save_trained_data(
agent_id="Senior Writer",
trained_data=task_evaluator().evaluate_training_data().model_dump(),
),
mock.call(),
mock.call().load(),
mock.call(),
mock.call().load(),
]
)
kickoff.assert_has_calls(
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
)
def test_crew_train_error():
@@ -1036,3 +1081,32 @@ def test_crew_train_error():
assert "train() missing 1 required positional argument: 'n_iterations'" in str(
e
)
def test__setup_for_training():
researcher.allow_delegation = True
writer.allow_delegation = True
agents = [researcher, writer]
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article",
expected_output="5 bullet points with a paragraph for each idea.",
)
crew = Crew(
agents=agents,
tasks=[task],
)
assert crew._train is False
assert task.human_input is False
for agent in agents:
assert agent.allow_delegation is True
crew._setup_for_training()
assert crew._train is True
assert task.human_input is True
for agent in agents:
assert agent.allow_delegation is False

View File

@@ -0,0 +1,64 @@
from unittest import mock
from unittest.mock import MagicMock, patch
from crewai.utilities.evaluators.task_evaluator import (
TaskEvaluator,
TrainingTaskEvaluation,
)
@patch("crewai.utilities.evaluators.task_evaluator.Converter")
def test_evaluate_training_data(converter_mock):
training_data = {
"agent_id": {
"data1": {
"initial_output": "Initial output 1",
"human_feedback": "Human feedback 1",
"improved_output": "Improved output 1",
},
"data2": {
"initial_output": "Initial output 2",
"human_feedback": "Human feedback 2",
"improved_output": "Improved output 2",
},
}
}
agent_id = "agent_id"
original_agent = MagicMock()
function_return_value = TrainingTaskEvaluation(
suggestions=[
"The initial output was already good, having a detailed explanation. However, the improved output "
"gave similar information but in a more professional manner using better vocabulary. For future tasks, "
"try to implement more elaborate language and precise terminology from the beginning."
],
quality=8.0,
final_summary="The agent responded well initially. However, the improved output showed that there is room "
"for enhancement in terms of language usage, precision, and professionalism. For future tasks, the agent "
"should focus more on these points from the start to increase performance.",
)
converter_mock.return_value.to_pydantic.return_value = function_return_value
result = TaskEvaluator(original_agent=original_agent).evaluate_training_data(
training_data, agent_id
)
assert result == function_return_value
converter_mock.assert_has_calls(
[
mock.call(
llm=original_agent.llm,
text="Assess the quality of the training data based on the llm output, human feedback , and llm "
"output improved result.\n\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
"1\n\nImproved Output:\nImproved output 1\n\nInitial Output:\nInitial output 2\n\nHuman "
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\nPlease provide:\n- "
"Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs "
"provide action items based on human_feedback for future tasks\n- A score from 0 to 10 evaluating "
"on completion, quality, and overall performance from the improved output to the initial output "
"based on the human feedback\n",
model=TrainingTaskEvaluation,
instructions="I'm gonna convert this raw text into valid JSON.\n\nThe json should have the "
"following structure, with the following keys:\n- suggestions: List[str]\n- "
"quality: float\n- final_summary: str",
),
mock.call().to_pydantic(),
]
)

View File

@@ -0,0 +1,41 @@
import os
import unittest
import pytest
from crewai.utilities.file_handler import PickleHandler
class TestPickleHandler(unittest.TestCase):
def setUp(self):
self.file_name = "test_data.pkl"
self.file_path = os.path.join(os.getcwd(), self.file_name)
self.handler = PickleHandler(self.file_name)
def tearDown(self):
if os.path.exists(self.file_path):
os.remove(self.file_path)
def test_initialize_file(self):
assert os.path.exists(self.file_path) is True
assert os.path.getsize(self.file_path) >= 0
def test_save_and_load(self):
data = {"key": "value"}
self.handler.save(data)
loaded_data = self.handler.load()
assert loaded_data == data
def test_load_empty_file(self):
loaded_data = self.handler.load()
assert loaded_data == {}
def test_load_corrupted_file(self):
with open(self.file_path, "wb") as file:
file.write(b"corrupted data")
with pytest.raises(Exception) as exc:
self.handler.load()
assert str(exc.value) == "pickle data was truncated"
assert "<class '_pickle.UnpicklingError'>" == str(exc.type)

View File

@@ -0,0 +1,42 @@
import os
import unittest
from crewai.utilities.training_handler import CrewTrainingHandler
class TestCrewTrainingHandler(unittest.TestCase):
def setUp(self):
self.handler = CrewTrainingHandler("trained_data.pkl")
def tearDown(self):
os.remove("trained_data.pkl")
del self.handler
def test_save_trained_data(self):
agent_id = "agent1"
trained_data = {"param1": 1, "param2": 2}
self.handler.save_trained_data(agent_id, trained_data)
# Assert that the trained data is saved correctly
data = self.handler.load()
assert data[agent_id] == trained_data
def test_append_existing_agent(self):
train_iteration = 1
agent_id = "agent1"
new_data = {"param3": 3, "param4": 4}
self.handler.append(train_iteration, agent_id, new_data)
# Assert that the new data is appended correctly to the existing agent
data = self.handler.load()
assert data[agent_id][train_iteration] == new_data
def test_append_new_agent(self):
train_iteration = 1
agent_id = "agent2"
new_data = {"param5": 5, "param6": 6}
self.handler.append(train_iteration, agent_id, new_data)
# Assert that the new agent and data are appended correctly
data = self.handler.load()
assert data[agent_id][train_iteration] == new_data