Compare commits

...

14 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
eb84ae5e85 Merge branch 'main' into bugfix/fix-broken-training 2025-01-29 14:18:04 -05:00
Brandon Hancock
25690c861f simplify training 2025-01-29 14:16:44 -05:00
Brandon Hancock (bhancock_ai)
2404baab10 Merge branch 'main' into bugfix/fix-broken-training 2025-01-29 10:12:31 -05:00
Brandon Hancock (bhancock_ai)
d89cdb23ce Merge branch 'main' into bugfix/fix-broken-training 2025-01-29 10:07:56 -05:00
Brandon Hancock
4ff2252012 Fix type issues pointed out by lorenze 2025-01-29 09:22:24 -05:00
Brandon Hancock
573d1a8c34 fix failing test 2025-01-28 15:15:38 -05:00
Brandon Hancock
315cce7711 drop bad code 2025-01-28 15:06:58 -05:00
Brandon Hancock
839f3c4142 add clear 2025-01-28 14:27:57 -05:00
Brandon Hancock
10b42bd0d4 fix failing tests 2025-01-28 14:25:10 -05:00
Brandon Hancock
a690bb91b3 Drop comment 2025-01-28 13:20:44 -05:00
Brandon Hancock
1f014c1d61 make sure to raise an error when missing training data 2025-01-28 13:17:48 -05:00
Brandon Hancock
164d105fd1 improve prompts 2025-01-28 13:05:23 -05:00
Brandon Hancock
a5f2de3e98 Merge branch 'main' into bugfix/fix-broken-training 2025-01-28 12:33:02 -05:00
Brandon Hancock
6ccede42f7 Fixing training while refactoring code 2025-01-28 11:27:15 -05:00
6 changed files with 210 additions and 137 deletions

View File

@@ -95,18 +95,29 @@ class CrewAgentExecutorMixin:
pass
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input for final decision making."""
"""Prompt human input with mode-appropriate messaging."""
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
self._printer.print(
content=(
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt = (
"\n\n=====\n"
"## Please provide feedback on the Final Result and the Agent's actions. "
"Respond with 'looks good' or a similar phrase when you're satisfied.\n"
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process.\n"
"=====\n"
),
color="bold_yellow",
)
)
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt = (
"\n\n=====\n"
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Respond with 'looks good' to accept or provide specific improvement requests.\n"
"You can provide multiple rounds of feedback until satisfied.\n"
"=====\n"
)
self._printer.print(content=prompt, color="bold_yellow")
return input()

View File

@@ -100,6 +100,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
try:
formatted_answer = self._invoke_loop()
except AssertionError:
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
@@ -115,7 +121,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._create_long_term_memory(formatted_answer)
return {"output": formatted_answer.output}
def _invoke_loop(self):
def _invoke_loop(self) -> AgentFinish:
"""
Main loop to invoke the agent's thought process until it reaches a conclusion
or the maximum number of iterations is reached.
@@ -161,6 +167,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
finally:
self.iterations += 1
# During the invoke loop, formatted_answer alternates between AgentAction
# (when the agent is using tools) and eventually becomes AgentFinish
# (when the agent reaches a final answer). This assertion confirms we've
# reached a final answer and helps type checking understand this transition.
assert isinstance(formatted_answer, AgentFinish)
self._show_logs(formatted_answer)
return formatted_answer
@@ -292,8 +303,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
)
description = (
getattr(self.task, "description") if self.task else "Not Found"
)
self._printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{self.task.description}\033[00m"
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
)
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
@@ -418,58 +432,50 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: str | None = None
self, result: AgentFinish, human_feedback: Optional[str] = None
) -> None:
"""Function to handle the process of the training data."""
"""Handle the process of saving training data."""
agent_id = str(self.agent.id) # type: ignore
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
if train_iteration is None or not isinstance(train_iteration, int):
self._printer.print(
content="Invalid or missing train iteration. Cannot save training data.",
color="red",
)
return
# Load training data
training_handler = CrewTrainingHandler(TRAINING_DATA_FILE)
training_data = training_handler.load()
training_data = training_handler.load() or {}
# Check if training data exists, human input is not requested, and self.crew is valid
if training_data and not self.ask_for_human_input:
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
train_iteration = self.crew._train_iteration
if agent_id in training_data and isinstance(train_iteration, int):
training_data[agent_id][train_iteration][
"improved_output"
] = result.output
training_handler.save(training_data)
else:
self._printer.print(
content="Invalid train iteration type or agent_id not in training data.",
color="red",
)
else:
self._printer.print(
content="Crew is None or does not have _train_iteration attribute.",
color="red",
)
# Initialize or retrieve agent's training data
agent_training_data = training_data.get(agent_id, {})
if self.ask_for_human_input and human_feedback is not None:
training_data = {
if human_feedback is not None:
# Save initial output and human feedback
agent_training_data[train_iteration] = {
"initial_output": result.output,
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.agent.role, # type: ignore
}
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
train_iteration = self.crew._train_iteration
if isinstance(train_iteration, int):
CrewTrainingHandler(TRAINING_DATA_FILE).append(
train_iteration, agent_id, training_data
)
else:
self._printer.print(
content="Invalid train iteration type. Expected int.",
color="red",
)
else:
# Save improved output
if train_iteration in agent_training_data:
agent_training_data[train_iteration]["improved_output"] = result.output
else:
self._printer.print(
content="Crew is None or does not have _train_iteration attribute.",
content=(
f"No existing training data for agent {agent_id} and iteration "
f"{train_iteration}. Cannot save improved output."
),
color="red",
)
return
# Update the training data and save
training_data[agent_id] = agent_training_data
training_handler.save(training_data)
def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str:
prompt = prompt.replace("{input}", inputs["input"])
@@ -485,82 +491,103 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return {"role": role, "content": prompt}
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""
Handles the human feedback loop, allowing the user to provide feedback
on the agent's output and determining if additional iterations are needed.
"""Handle human feedback with different flows for training vs regular use.
Parameters:
formatted_answer (AgentFinish): The initial output from the agent.
Args:
formatted_answer: The initial AgentFinish result to get feedback on
Returns:
AgentFinish: The final output after incorporating human feedback.
AgentFinish: The final answer after processing feedback
"""
human_feedback = self._ask_human_input(formatted_answer.output)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool:
"""Check if crew is in training mode."""
return bool(self.crew and self.crew._train)
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process feedback for training scenarios with single iteration."""
self._printer.print(
content="\nProcessing training feedback.\n",
color="yellow",
)
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(self._format_msg(f"Feedback: {feedback}"))
improved_answer = self._invoke_loop()
self._handle_crew_training_output(improved_answer)
self.ask_for_human_input = False
return improved_answer
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process feedback for regular use with potential multiple iterations."""
feedback = initial_feedback
answer = current_answer
while self.ask_for_human_input:
human_feedback = self._ask_human_input(formatted_answer.output)
response = self._get_llm_feedback_response(feedback)
if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer, human_feedback)
# Make an LLM call to verify if additional changes are requested based on human feedback
additional_changes_prompt = self._i18n.slice(
"human_feedback_classification"
).format(feedback=human_feedback)
retry_count = 0
llm_call_successful = False
additional_changes_response = None
while retry_count < MAX_LLM_RETRY and not llm_call_successful:
try:
additional_changes_response = (
self.llm.call(
[
self._format_msg(
additional_changes_prompt, role="system"
)
],
callbacks=self.callbacks,
)
.strip()
.lower()
)
llm_call_successful = True
except Exception as e:
retry_count += 1
self._printer.print(
content=f"Error during LLM call to classify human feedback: {e}. Retrying... ({retry_count}/{MAX_LLM_RETRY})",
color="red",
)
if not llm_call_successful:
self._printer.print(
content="Error processing feedback after multiple attempts.",
color="red",
)
if not self._feedback_requires_changes(response):
self.ask_for_human_input = False
break
if additional_changes_response == "false":
self.ask_for_human_input = False
elif additional_changes_response == "true":
self.ask_for_human_input = True
# Add human feedback to messages
self.messages.append(self._format_msg(f"Feedback: {human_feedback}"))
# Invoke the loop again with updated messages
formatted_answer = self._invoke_loop()
if self.crew and self.crew._train:
self._handle_crew_training_output(formatted_answer)
else:
# Unexpected response
self._printer.print(
content=f"Unexpected response from LLM: '{additional_changes_response}'. Assuming no additional changes requested.",
color="red",
)
self.ask_for_human_input = False
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output)
return formatted_answer
return answer
def _get_llm_feedback_response(self, feedback: str) -> Optional[str]:
"""Get LLM classification of whether feedback requires changes."""
prompt = self._i18n.slice("human_feedback_classification").format(
feedback=feedback
)
message = self._format_msg(prompt, role="system")
for retry in range(MAX_LLM_RETRY):
try:
response = self.llm.call([message], callbacks=self.callbacks)
return response.strip().lower() if response else None
except Exception as error:
self._log_feedback_error(retry, error)
self._log_max_retries_exceeded()
return None
def _feedback_requires_changes(self, response: Optional[str]) -> bool:
"""Determine if feedback response indicates need for changes."""
return response == "true" if response else False
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
self.messages.append(self._format_msg(f"Feedback: {feedback}"))
return self._invoke_loop()
def _log_feedback_error(self, retry_count: int, error: Exception) -> None:
"""Log feedback processing errors."""
self._printer.print(
content=(
f"Error processing feedback: {error}. "
f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})"
),
color="red",
)
def _log_max_retries_exceeded(self) -> None:
"""Log when max retries for feedback processing are exceeded."""
self._printer.print(
content=(
f"Failed to process feedback after {MAX_LLM_RETRY} attempts. "
"Ending feedback loop."
),
color="red",
)
def _handle_max_iterations_exceeded(self, formatted_answer):
"""

View File

@@ -494,21 +494,26 @@ class Crew(BaseModel):
train_crew = self.copy()
train_crew._setup_for_training(filename)
for n_iteration in range(n_iterations):
train_crew._train_iteration = n_iteration
train_crew.kickoff(inputs=inputs)
try:
for n_iteration in range(n_iterations):
train_crew._train_iteration = n_iteration
train_crew.kickoff(inputs=inputs)
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
for agent in train_crew.agents:
if training_data.get(str(agent.id)):
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)
CrewTrainingHandler(filename).save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
for agent in train_crew.agents:
if training_data.get(str(agent.id)):
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)
CrewTrainingHandler(filename).save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
except Exception as e:
self._logger.log("error", f"Training failed: {e}", color="red")
CrewTrainingHandler(TRAINING_DATA_FILE).clear()
CrewTrainingHandler(filename).clear()
raise
def kickoff(
self,

View File

@@ -92,13 +92,34 @@ class TaskEvaluator:
"""
output_training_data = training_data[agent_id]
final_aggregated_data = ""
for _, data in output_training_data.items():
for iteration, data in output_training_data.items():
improved_output = data.get("improved_output")
initial_output = data.get("initial_output")
human_feedback = data.get("human_feedback")
if not all([improved_output, initial_output, human_feedback]):
missing_fields = [
field
for field in ["improved_output", "initial_output", "human_feedback"]
if not data.get(field)
]
error_msg = (
f"Critical training data error: Missing fields ({', '.join(missing_fields)}) "
f"for agent {agent_id} in iteration {iteration}.\n"
"This indicates a broken training process. "
"Cannot proceed with evaluation.\n"
"Please check your training implementation."
)
raise ValueError(error_msg)
final_aggregated_data += (
f"Initial Output:\n{data.get('initial_output', '')}\n\n"
f"Human Feedback:\n{data.get('human_feedback', '')}\n\n"
f"Improved Output:\n{data.get('improved_output', '')}\n\n"
f"Iteration: {iteration}\n"
f"Initial Output:\n{initial_output}\n\n"
f"Human Feedback:\n{human_feedback}\n\n"
f"Improved Output:\n{improved_output}\n\n"
"------------------------------------------------\n\n"
)
evaluation_query = (

View File

@@ -1,3 +1,5 @@
import os
from crewai.utilities.file_handler import PickleHandler
@@ -29,3 +31,10 @@ class CrewTrainingHandler(PickleHandler):
data[agent_id] = {train_iteration: new_data}
self.save(data)
def clear(self) -> None:
"""Clear the training data by removing the file or resetting its contents."""
if os.path.exists(self.file_path):
with open(self.file_path, "wb") as file:
# Overwrite with an empty dictionary
self.save({})

View File

@@ -48,9 +48,9 @@ def test_evaluate_training_data(converter_mock):
mock.call(
llm=original_agent.llm,
text="Assess the quality of the training data based on the llm output, human feedback , and llm "
"output improved result.\n\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
"1\n\nImproved Output:\nImproved output 1\n\nInitial Output:\nInitial output 2\n\nHuman "
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\nPlease provide:\n- Provide "
"output improved result.\n\nIteration: data1\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
"1\n\nImproved Output:\nImproved output 1\n\n------------------------------------------------\n\nIteration: data2\nInitial Output:\nInitial output 2\n\nHuman "
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\n------------------------------------------------\n\nPlease provide:\n- Provide "
"a list of clear, actionable instructions derived from the Human Feedbacks to enhance the Agent's "
"performance. Analyze the differences between Initial Outputs and Improved Outputs to generate specific "
"action items for future tasks. Ensure all key and specificpoints from the human feedback are "