mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 12:28:30 +00:00
Compare commits
14 Commits
devin/1744
...
bugfix/fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb84ae5e85 | ||
|
|
25690c861f | ||
|
|
2404baab10 | ||
|
|
d89cdb23ce | ||
|
|
4ff2252012 | ||
|
|
573d1a8c34 | ||
|
|
315cce7711 | ||
|
|
839f3c4142 | ||
|
|
10b42bd0d4 | ||
|
|
a690bb91b3 | ||
|
|
1f014c1d61 | ||
|
|
164d105fd1 | ||
|
|
a5f2de3e98 | ||
|
|
6ccede42f7 |
@@ -95,18 +95,29 @@ class CrewAgentExecutorMixin:
|
||||
pass
|
||||
|
||||
def _ask_human_input(self, final_answer: str) -> str:
|
||||
"""Prompt human input for final decision making."""
|
||||
"""Prompt human input with mode-appropriate messaging."""
|
||||
self._printer.print(
|
||||
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
|
||||
)
|
||||
|
||||
self._printer.print(
|
||||
content=(
|
||||
# Training mode prompt (single iteration)
|
||||
if self.crew and getattr(self.crew, "_train", False):
|
||||
prompt = (
|
||||
"\n\n=====\n"
|
||||
"## Please provide feedback on the Final Result and the Agent's actions. "
|
||||
"Respond with 'looks good' or a similar phrase when you're satisfied.\n"
|
||||
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
|
||||
"This will be used to train better versions of the agent.\n"
|
||||
"Please provide detailed feedback about the result quality and reasoning process.\n"
|
||||
"=====\n"
|
||||
),
|
||||
color="bold_yellow",
|
||||
)
|
||||
)
|
||||
# Regular human-in-the-loop prompt (multiple iterations)
|
||||
else:
|
||||
prompt = (
|
||||
"\n\n=====\n"
|
||||
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
|
||||
"Respond with 'looks good' to accept or provide specific improvement requests.\n"
|
||||
"You can provide multiple rounds of feedback until satisfied.\n"
|
||||
"=====\n"
|
||||
)
|
||||
|
||||
self._printer.print(content=prompt, color="bold_yellow")
|
||||
return input()
|
||||
|
||||
@@ -100,6 +100,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
|
||||
try:
|
||||
formatted_answer = self._invoke_loop()
|
||||
except AssertionError:
|
||||
self._printer.print(
|
||||
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
|
||||
color="red",
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
if e.__class__.__module__.startswith("litellm"):
|
||||
# Do not retry on litellm errors
|
||||
@@ -115,7 +121,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self._create_long_term_memory(formatted_answer)
|
||||
return {"output": formatted_answer.output}
|
||||
|
||||
def _invoke_loop(self):
|
||||
def _invoke_loop(self) -> AgentFinish:
|
||||
"""
|
||||
Main loop to invoke the agent's thought process until it reaches a conclusion
|
||||
or the maximum number of iterations is reached.
|
||||
@@ -161,6 +167,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
finally:
|
||||
self.iterations += 1
|
||||
|
||||
# During the invoke loop, formatted_answer alternates between AgentAction
|
||||
# (when the agent is using tools) and eventually becomes AgentFinish
|
||||
# (when the agent reaches a final answer). This assertion confirms we've
|
||||
# reached a final answer and helps type checking understand this transition.
|
||||
assert isinstance(formatted_answer, AgentFinish)
|
||||
self._show_logs(formatted_answer)
|
||||
return formatted_answer
|
||||
|
||||
@@ -292,8 +303,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self._printer.print(
|
||||
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{agent_role}\033[00m"
|
||||
)
|
||||
description = (
|
||||
getattr(self.task, "description") if self.task else "Not Found"
|
||||
)
|
||||
self._printer.print(
|
||||
content=f"\033[95m## Task:\033[00m \033[92m{self.task.description}\033[00m"
|
||||
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
|
||||
)
|
||||
|
||||
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
|
||||
@@ -418,58 +432,50 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
)
|
||||
|
||||
def _handle_crew_training_output(
|
||||
self, result: AgentFinish, human_feedback: str | None = None
|
||||
self, result: AgentFinish, human_feedback: Optional[str] = None
|
||||
) -> None:
|
||||
"""Function to handle the process of the training data."""
|
||||
"""Handle the process of saving training data."""
|
||||
agent_id = str(self.agent.id) # type: ignore
|
||||
train_iteration = (
|
||||
getattr(self.crew, "_train_iteration", None) if self.crew else None
|
||||
)
|
||||
|
||||
if train_iteration is None or not isinstance(train_iteration, int):
|
||||
self._printer.print(
|
||||
content="Invalid or missing train iteration. Cannot save training data.",
|
||||
color="red",
|
||||
)
|
||||
return
|
||||
|
||||
# Load training data
|
||||
training_handler = CrewTrainingHandler(TRAINING_DATA_FILE)
|
||||
training_data = training_handler.load()
|
||||
training_data = training_handler.load() or {}
|
||||
|
||||
# Check if training data exists, human input is not requested, and self.crew is valid
|
||||
if training_data and not self.ask_for_human_input:
|
||||
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
|
||||
train_iteration = self.crew._train_iteration
|
||||
if agent_id in training_data and isinstance(train_iteration, int):
|
||||
training_data[agent_id][train_iteration][
|
||||
"improved_output"
|
||||
] = result.output
|
||||
training_handler.save(training_data)
|
||||
else:
|
||||
self._printer.print(
|
||||
content="Invalid train iteration type or agent_id not in training data.",
|
||||
color="red",
|
||||
)
|
||||
else:
|
||||
self._printer.print(
|
||||
content="Crew is None or does not have _train_iteration attribute.",
|
||||
color="red",
|
||||
)
|
||||
# Initialize or retrieve agent's training data
|
||||
agent_training_data = training_data.get(agent_id, {})
|
||||
|
||||
if self.ask_for_human_input and human_feedback is not None:
|
||||
training_data = {
|
||||
if human_feedback is not None:
|
||||
# Save initial output and human feedback
|
||||
agent_training_data[train_iteration] = {
|
||||
"initial_output": result.output,
|
||||
"human_feedback": human_feedback,
|
||||
"agent": agent_id,
|
||||
"agent_role": self.agent.role, # type: ignore
|
||||
}
|
||||
if self.crew is not None and hasattr(self.crew, "_train_iteration"):
|
||||
train_iteration = self.crew._train_iteration
|
||||
if isinstance(train_iteration, int):
|
||||
CrewTrainingHandler(TRAINING_DATA_FILE).append(
|
||||
train_iteration, agent_id, training_data
|
||||
)
|
||||
else:
|
||||
self._printer.print(
|
||||
content="Invalid train iteration type. Expected int.",
|
||||
color="red",
|
||||
)
|
||||
else:
|
||||
# Save improved output
|
||||
if train_iteration in agent_training_data:
|
||||
agent_training_data[train_iteration]["improved_output"] = result.output
|
||||
else:
|
||||
self._printer.print(
|
||||
content="Crew is None or does not have _train_iteration attribute.",
|
||||
content=(
|
||||
f"No existing training data for agent {agent_id} and iteration "
|
||||
f"{train_iteration}. Cannot save improved output."
|
||||
),
|
||||
color="red",
|
||||
)
|
||||
return
|
||||
|
||||
# Update the training data and save
|
||||
training_data[agent_id] = agent_training_data
|
||||
training_handler.save(training_data)
|
||||
|
||||
def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str:
|
||||
prompt = prompt.replace("{input}", inputs["input"])
|
||||
@@ -485,82 +491,103 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
return {"role": role, "content": prompt}
|
||||
|
||||
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
|
||||
"""
|
||||
Handles the human feedback loop, allowing the user to provide feedback
|
||||
on the agent's output and determining if additional iterations are needed.
|
||||
"""Handle human feedback with different flows for training vs regular use.
|
||||
|
||||
Parameters:
|
||||
formatted_answer (AgentFinish): The initial output from the agent.
|
||||
Args:
|
||||
formatted_answer: The initial AgentFinish result to get feedback on
|
||||
|
||||
Returns:
|
||||
AgentFinish: The final output after incorporating human feedback.
|
||||
AgentFinish: The final answer after processing feedback
|
||||
"""
|
||||
human_feedback = self._ask_human_input(formatted_answer.output)
|
||||
|
||||
if self._is_training_mode():
|
||||
return self._handle_training_feedback(formatted_answer, human_feedback)
|
||||
|
||||
return self._handle_regular_feedback(formatted_answer, human_feedback)
|
||||
|
||||
def _is_training_mode(self) -> bool:
|
||||
"""Check if crew is in training mode."""
|
||||
return bool(self.crew and self.crew._train)
|
||||
|
||||
def _handle_training_feedback(
|
||||
self, initial_answer: AgentFinish, feedback: str
|
||||
) -> AgentFinish:
|
||||
"""Process feedback for training scenarios with single iteration."""
|
||||
self._printer.print(
|
||||
content="\nProcessing training feedback.\n",
|
||||
color="yellow",
|
||||
)
|
||||
self._handle_crew_training_output(initial_answer, feedback)
|
||||
self.messages.append(self._format_msg(f"Feedback: {feedback}"))
|
||||
improved_answer = self._invoke_loop()
|
||||
self._handle_crew_training_output(improved_answer)
|
||||
self.ask_for_human_input = False
|
||||
return improved_answer
|
||||
|
||||
def _handle_regular_feedback(
|
||||
self, current_answer: AgentFinish, initial_feedback: str
|
||||
) -> AgentFinish:
|
||||
"""Process feedback for regular use with potential multiple iterations."""
|
||||
feedback = initial_feedback
|
||||
answer = current_answer
|
||||
|
||||
while self.ask_for_human_input:
|
||||
human_feedback = self._ask_human_input(formatted_answer.output)
|
||||
response = self._get_llm_feedback_response(feedback)
|
||||
|
||||
if self.crew and self.crew._train:
|
||||
self._handle_crew_training_output(formatted_answer, human_feedback)
|
||||
|
||||
# Make an LLM call to verify if additional changes are requested based on human feedback
|
||||
additional_changes_prompt = self._i18n.slice(
|
||||
"human_feedback_classification"
|
||||
).format(feedback=human_feedback)
|
||||
|
||||
retry_count = 0
|
||||
llm_call_successful = False
|
||||
additional_changes_response = None
|
||||
|
||||
while retry_count < MAX_LLM_RETRY and not llm_call_successful:
|
||||
try:
|
||||
additional_changes_response = (
|
||||
self.llm.call(
|
||||
[
|
||||
self._format_msg(
|
||||
additional_changes_prompt, role="system"
|
||||
)
|
||||
],
|
||||
callbacks=self.callbacks,
|
||||
)
|
||||
.strip()
|
||||
.lower()
|
||||
)
|
||||
llm_call_successful = True
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
|
||||
self._printer.print(
|
||||
content=f"Error during LLM call to classify human feedback: {e}. Retrying... ({retry_count}/{MAX_LLM_RETRY})",
|
||||
color="red",
|
||||
)
|
||||
|
||||
if not llm_call_successful:
|
||||
self._printer.print(
|
||||
content="Error processing feedback after multiple attempts.",
|
||||
color="red",
|
||||
)
|
||||
if not self._feedback_requires_changes(response):
|
||||
self.ask_for_human_input = False
|
||||
break
|
||||
|
||||
if additional_changes_response == "false":
|
||||
self.ask_for_human_input = False
|
||||
elif additional_changes_response == "true":
|
||||
self.ask_for_human_input = True
|
||||
# Add human feedback to messages
|
||||
self.messages.append(self._format_msg(f"Feedback: {human_feedback}"))
|
||||
# Invoke the loop again with updated messages
|
||||
formatted_answer = self._invoke_loop()
|
||||
|
||||
if self.crew and self.crew._train:
|
||||
self._handle_crew_training_output(formatted_answer)
|
||||
else:
|
||||
# Unexpected response
|
||||
self._printer.print(
|
||||
content=f"Unexpected response from LLM: '{additional_changes_response}'. Assuming no additional changes requested.",
|
||||
color="red",
|
||||
)
|
||||
self.ask_for_human_input = False
|
||||
answer = self._process_feedback_iteration(feedback)
|
||||
feedback = self._ask_human_input(answer.output)
|
||||
|
||||
return formatted_answer
|
||||
return answer
|
||||
|
||||
def _get_llm_feedback_response(self, feedback: str) -> Optional[str]:
|
||||
"""Get LLM classification of whether feedback requires changes."""
|
||||
prompt = self._i18n.slice("human_feedback_classification").format(
|
||||
feedback=feedback
|
||||
)
|
||||
message = self._format_msg(prompt, role="system")
|
||||
|
||||
for retry in range(MAX_LLM_RETRY):
|
||||
try:
|
||||
response = self.llm.call([message], callbacks=self.callbacks)
|
||||
return response.strip().lower() if response else None
|
||||
except Exception as error:
|
||||
self._log_feedback_error(retry, error)
|
||||
|
||||
self._log_max_retries_exceeded()
|
||||
return None
|
||||
|
||||
def _feedback_requires_changes(self, response: Optional[str]) -> bool:
|
||||
"""Determine if feedback response indicates need for changes."""
|
||||
return response == "true" if response else False
|
||||
|
||||
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
|
||||
"""Process a single feedback iteration."""
|
||||
self.messages.append(self._format_msg(f"Feedback: {feedback}"))
|
||||
return self._invoke_loop()
|
||||
|
||||
def _log_feedback_error(self, retry_count: int, error: Exception) -> None:
|
||||
"""Log feedback processing errors."""
|
||||
self._printer.print(
|
||||
content=(
|
||||
f"Error processing feedback: {error}. "
|
||||
f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})"
|
||||
),
|
||||
color="red",
|
||||
)
|
||||
|
||||
def _log_max_retries_exceeded(self) -> None:
|
||||
"""Log when max retries for feedback processing are exceeded."""
|
||||
self._printer.print(
|
||||
content=(
|
||||
f"Failed to process feedback after {MAX_LLM_RETRY} attempts. "
|
||||
"Ending feedback loop."
|
||||
),
|
||||
color="red",
|
||||
)
|
||||
|
||||
def _handle_max_iterations_exceeded(self, formatted_answer):
|
||||
"""
|
||||
|
||||
@@ -494,21 +494,26 @@ class Crew(BaseModel):
|
||||
train_crew = self.copy()
|
||||
train_crew._setup_for_training(filename)
|
||||
|
||||
for n_iteration in range(n_iterations):
|
||||
train_crew._train_iteration = n_iteration
|
||||
train_crew.kickoff(inputs=inputs)
|
||||
try:
|
||||
for n_iteration in range(n_iterations):
|
||||
train_crew._train_iteration = n_iteration
|
||||
train_crew.kickoff(inputs=inputs)
|
||||
|
||||
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
|
||||
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
|
||||
|
||||
for agent in train_crew.agents:
|
||||
if training_data.get(str(agent.id)):
|
||||
result = TaskEvaluator(agent).evaluate_training_data(
|
||||
training_data=training_data, agent_id=str(agent.id)
|
||||
)
|
||||
|
||||
CrewTrainingHandler(filename).save_trained_data(
|
||||
agent_id=str(agent.role), trained_data=result.model_dump()
|
||||
)
|
||||
for agent in train_crew.agents:
|
||||
if training_data.get(str(agent.id)):
|
||||
result = TaskEvaluator(agent).evaluate_training_data(
|
||||
training_data=training_data, agent_id=str(agent.id)
|
||||
)
|
||||
CrewTrainingHandler(filename).save_trained_data(
|
||||
agent_id=str(agent.role), trained_data=result.model_dump()
|
||||
)
|
||||
except Exception as e:
|
||||
self._logger.log("error", f"Training failed: {e}", color="red")
|
||||
CrewTrainingHandler(TRAINING_DATA_FILE).clear()
|
||||
CrewTrainingHandler(filename).clear()
|
||||
raise
|
||||
|
||||
def kickoff(
|
||||
self,
|
||||
|
||||
@@ -92,13 +92,34 @@ class TaskEvaluator:
|
||||
"""
|
||||
|
||||
output_training_data = training_data[agent_id]
|
||||
|
||||
final_aggregated_data = ""
|
||||
for _, data in output_training_data.items():
|
||||
|
||||
for iteration, data in output_training_data.items():
|
||||
improved_output = data.get("improved_output")
|
||||
initial_output = data.get("initial_output")
|
||||
human_feedback = data.get("human_feedback")
|
||||
|
||||
if not all([improved_output, initial_output, human_feedback]):
|
||||
missing_fields = [
|
||||
field
|
||||
for field in ["improved_output", "initial_output", "human_feedback"]
|
||||
if not data.get(field)
|
||||
]
|
||||
error_msg = (
|
||||
f"Critical training data error: Missing fields ({', '.join(missing_fields)}) "
|
||||
f"for agent {agent_id} in iteration {iteration}.\n"
|
||||
"This indicates a broken training process. "
|
||||
"Cannot proceed with evaluation.\n"
|
||||
"Please check your training implementation."
|
||||
)
|
||||
raise ValueError(error_msg)
|
||||
|
||||
final_aggregated_data += (
|
||||
f"Initial Output:\n{data.get('initial_output', '')}\n\n"
|
||||
f"Human Feedback:\n{data.get('human_feedback', '')}\n\n"
|
||||
f"Improved Output:\n{data.get('improved_output', '')}\n\n"
|
||||
f"Iteration: {iteration}\n"
|
||||
f"Initial Output:\n{initial_output}\n\n"
|
||||
f"Human Feedback:\n{human_feedback}\n\n"
|
||||
f"Improved Output:\n{improved_output}\n\n"
|
||||
"------------------------------------------------\n\n"
|
||||
)
|
||||
|
||||
evaluation_query = (
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import os
|
||||
|
||||
from crewai.utilities.file_handler import PickleHandler
|
||||
|
||||
|
||||
@@ -29,3 +31,10 @@ class CrewTrainingHandler(PickleHandler):
|
||||
data[agent_id] = {train_iteration: new_data}
|
||||
|
||||
self.save(data)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear the training data by removing the file or resetting its contents."""
|
||||
if os.path.exists(self.file_path):
|
||||
with open(self.file_path, "wb") as file:
|
||||
# Overwrite with an empty dictionary
|
||||
self.save({})
|
||||
|
||||
@@ -48,9 +48,9 @@ def test_evaluate_training_data(converter_mock):
|
||||
mock.call(
|
||||
llm=original_agent.llm,
|
||||
text="Assess the quality of the training data based on the llm output, human feedback , and llm "
|
||||
"output improved result.\n\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
|
||||
"1\n\nImproved Output:\nImproved output 1\n\nInitial Output:\nInitial output 2\n\nHuman "
|
||||
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\nPlease provide:\n- Provide "
|
||||
"output improved result.\n\nIteration: data1\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
|
||||
"1\n\nImproved Output:\nImproved output 1\n\n------------------------------------------------\n\nIteration: data2\nInitial Output:\nInitial output 2\n\nHuman "
|
||||
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\n------------------------------------------------\n\nPlease provide:\n- Provide "
|
||||
"a list of clear, actionable instructions derived from the Human Feedbacks to enhance the Agent's "
|
||||
"performance. Analyze the differences between Initial Outputs and Improved Outputs to generate specific "
|
||||
"action items for future tasks. Ensure all key and specificpoints from the human feedback are "
|
||||
|
||||
Reference in New Issue
Block a user