mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
replay working for both seq and hier just need tests
This commit is contained in:
@@ -33,11 +33,11 @@ from crewai.telemetry import Telemetry
|
|||||||
from crewai.tools.agent_tools import AgentTools
|
from crewai.tools.agent_tools import AgentTools
|
||||||
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
||||||
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
||||||
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
|
|
||||||
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
||||||
from crewai.utilities.file_handler import TaskOutputJsonHandler
|
from crewai.utilities.file_handler import TaskOutputJsonHandler
|
||||||
from crewai.utilities.formatter import (
|
from crewai.utilities.formatter import (
|
||||||
aggregate_raw_outputs_from_task_outputs,
|
aggregate_raw_outputs_from_task_outputs,
|
||||||
|
aggregate_raw_outputs_from_tasks,
|
||||||
)
|
)
|
||||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||||
|
|
||||||
@@ -392,11 +392,10 @@ class Crew(BaseModel):
|
|||||||
) -> CrewOutput:
|
) -> CrewOutput:
|
||||||
"""Starts the crew to work on its assigned tasks."""
|
"""Starts the crew to work on its assigned tasks."""
|
||||||
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
||||||
self.execution_logs = []
|
|
||||||
self._task_output_handler.reset()
|
self._task_output_handler.reset()
|
||||||
if inputs is not None:
|
if inputs is not None:
|
||||||
self._inputs = inputs
|
self._inputs = inputs
|
||||||
# self._interpolate_inputs(inputs)
|
self._interpolate_inputs(inputs)
|
||||||
self._set_tasks_callbacks()
|
self._set_tasks_callbacks()
|
||||||
|
|
||||||
i18n = I18N(prompt_file=self.prompt_file)
|
i18n = I18N(prompt_file=self.prompt_file)
|
||||||
@@ -422,7 +421,7 @@ class Crew(BaseModel):
|
|||||||
if self.process == Process.sequential:
|
if self.process == Process.sequential:
|
||||||
result = self._run_sequential_process()
|
result = self._run_sequential_process()
|
||||||
elif self.process == Process.hierarchical:
|
elif self.process == Process.hierarchical:
|
||||||
result = self._run_hierarchical_process() # type: ignore # Incompatible types in assignment (expression has type "str | dict[str, Any]", variable has type "str")
|
result = self._run_hierarchical_process()
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
f"The process '{self.process}' is not implemented yet."
|
f"The process '{self.process}' is not implemented yet."
|
||||||
@@ -510,7 +509,13 @@ class Crew(BaseModel):
|
|||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def _store_execution_log(self, task: Task, output, task_index):
|
def _store_execution_log(
|
||||||
|
self,
|
||||||
|
task: Task,
|
||||||
|
output: TaskOutput,
|
||||||
|
task_index: int,
|
||||||
|
was_replayed: bool = False,
|
||||||
|
):
|
||||||
if self._inputs:
|
if self._inputs:
|
||||||
inputs = self._inputs
|
inputs = self._inputs
|
||||||
else:
|
else:
|
||||||
@@ -521,21 +526,28 @@ class Crew(BaseModel):
|
|||||||
"expected_output": task.expected_output,
|
"expected_output": task.expected_output,
|
||||||
"agent_role": task.agent.role if task.agent else "None",
|
"agent_role": task.agent.role if task.agent else "None",
|
||||||
"output": {
|
"output": {
|
||||||
"description": task.description,
|
"description": output.description,
|
||||||
"summary": task.description,
|
"summary": output.summary,
|
||||||
"raw_output": output.raw_output,
|
"raw": output.raw,
|
||||||
"pydantic_output": output.pydantic_output,
|
"pydantic": output.pydantic,
|
||||||
"json_output": output.json_output,
|
"json_dict": output.json_dict,
|
||||||
"agent": task.agent.role if task.agent else "None",
|
"output_format": output.output_format,
|
||||||
|
"agent": output.agent,
|
||||||
},
|
},
|
||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
"task_index": task_index,
|
"task_index": task_index,
|
||||||
"inputs": inputs,
|
"inputs": inputs,
|
||||||
|
"was_replayed": was_replayed,
|
||||||
}
|
}
|
||||||
self.execution_logs.append(log)
|
# Update the existing log or append if it's a new entry
|
||||||
self._task_output_handler.append(log)
|
if task_index < len(self.execution_logs):
|
||||||
|
self.execution_logs[task_index] = log
|
||||||
|
else:
|
||||||
|
self.execution_logs.append(log)
|
||||||
|
|
||||||
def _run_sequential_process(self):
|
self._task_output_handler.update(task_index, log)
|
||||||
|
|
||||||
|
def _run_sequential_process(self) -> CrewOutput:
|
||||||
"""Executes tasks sequentially and returns the final output."""
|
"""Executes tasks sequentially and returns the final output."""
|
||||||
self.execution_logs = []
|
self.execution_logs = []
|
||||||
return self._execute_tasks(self.tasks)
|
return self._execute_tasks(self.tasks)
|
||||||
@@ -544,7 +556,7 @@ class Crew(BaseModel):
|
|||||||
self,
|
self,
|
||||||
tasks: List[Task],
|
tasks: List[Task],
|
||||||
manager: Optional[BaseAgent] = None,
|
manager: Optional[BaseAgent] = None,
|
||||||
):
|
) -> CrewOutput:
|
||||||
task_outputs: List[TaskOutput] = []
|
task_outputs: List[TaskOutput] = []
|
||||||
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
||||||
for task_index, task in enumerate(tasks):
|
for task_index, task in enumerate(tasks):
|
||||||
@@ -576,7 +588,7 @@ class Crew(BaseModel):
|
|||||||
agent=role, task=task.description, status="started"
|
agent=role, task=task.description, status="started"
|
||||||
)
|
)
|
||||||
if task.async_execution:
|
if task.async_execution:
|
||||||
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
context = self._set_context(task, task_outputs)
|
||||||
if agent_to_use:
|
if agent_to_use:
|
||||||
future = task.execute_async(
|
future = task.execute_async(
|
||||||
agent=agent_to_use,
|
agent=agent_to_use,
|
||||||
@@ -594,8 +606,7 @@ class Crew(BaseModel):
|
|||||||
task_outputs = self._process_async_tasks(futures)
|
task_outputs = self._process_async_tasks(futures)
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
context = self._set_context(task, task_outputs)
|
||||||
|
|
||||||
if agent_to_use:
|
if agent_to_use:
|
||||||
task_output = task.execute_sync(
|
task_output = task.execute_sync(
|
||||||
agent=agent_to_use,
|
agent=agent_to_use,
|
||||||
@@ -609,7 +620,29 @@ class Crew(BaseModel):
|
|||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures)
|
task_outputs = self._process_async_tasks(futures)
|
||||||
|
|
||||||
return task_outputs
|
final_task_output = task_outputs[0]
|
||||||
|
|
||||||
|
final_string_output = final_task_output.raw
|
||||||
|
self._finish_execution(final_string_output)
|
||||||
|
|
||||||
|
token_usage = self.calculate_usage_metrics()
|
||||||
|
|
||||||
|
return CrewOutput(
|
||||||
|
raw=final_task_output.raw,
|
||||||
|
pydantic=final_task_output.pydantic,
|
||||||
|
json_dict=final_task_output.json_dict,
|
||||||
|
tasks_output=[task.output for task in self.tasks if task.output],
|
||||||
|
token_usage=token_usage,
|
||||||
|
)
|
||||||
|
# return task_outputs
|
||||||
|
|
||||||
|
def _set_context(self, task: Task, task_outputs: List[TaskOutput]):
|
||||||
|
context = (
|
||||||
|
aggregate_raw_outputs_from_tasks(task.context)
|
||||||
|
if task.context
|
||||||
|
else aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||||
|
)
|
||||||
|
return context
|
||||||
|
|
||||||
def _process_task_result(self, task: Task, output: TaskOutput) -> None:
|
def _process_task_result(self, task: Task, output: TaskOutput) -> None:
|
||||||
role = task.agent.role if task.agent is not None else "None"
|
role = task.agent.role if task.agent is not None else "None"
|
||||||
@@ -620,14 +653,16 @@ class Crew(BaseModel):
|
|||||||
def _process_async_tasks(
|
def _process_async_tasks(
|
||||||
self,
|
self,
|
||||||
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
||||||
|
was_replayed: bool = False,
|
||||||
) -> List[TaskOutput]:
|
) -> List[TaskOutput]:
|
||||||
task_outputs = []
|
task_outputs = []
|
||||||
for future_task, future, task_index in futures:
|
for future_task, future, task_index in futures:
|
||||||
task_output = future.result()
|
task_output = future.result()
|
||||||
task_outputs.append(task_output)
|
task_outputs.append(task_output)
|
||||||
self._process_task_result(future_task, task_output)
|
self._process_task_result(future_task, task_output)
|
||||||
self._store_execution_log(future_task, task_output, task_index)
|
self._store_execution_log(
|
||||||
|
future_task, task_output, task_index, was_replayed
|
||||||
|
)
|
||||||
return task_outputs
|
return task_outputs
|
||||||
|
|
||||||
def _get_agent(self, role: str) -> Optional[BaseAgent]:
|
def _get_agent(self, role: str) -> Optional[BaseAgent]:
|
||||||
@@ -645,10 +680,31 @@ class Crew(BaseModel):
|
|||||||
return agent
|
return agent
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def replay_from_task_easy(self, task_id: str, inputs: Dict[str, Any] | None = None):
|
def _initialize_execution(self, inputs: Optional[Dict[str, Any]]) -> None:
|
||||||
task_outputs = []
|
"""Initializes the execution by setting up necessary attributes and states."""
|
||||||
stored_outputs = self._load_stored_outputs()
|
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
||||||
start_index = next(
|
# self.execution_logs = []
|
||||||
|
self._task_output_handler.reset()
|
||||||
|
if inputs is not None:
|
||||||
|
self._inputs = inputs
|
||||||
|
self._interpolate_inputs(inputs)
|
||||||
|
self._set_tasks_callbacks()
|
||||||
|
i18n = I18N(prompt_file=self.prompt_file)
|
||||||
|
for agent in self.agents:
|
||||||
|
agent.i18n = i18n
|
||||||
|
agent.crew = self # type: ignore[attr-defined]
|
||||||
|
if not agent.function_calling_llm: # type: ignore[attr-defined]
|
||||||
|
agent.function_calling_llm = self.function_calling_llm # type: ignore[attr-defined]
|
||||||
|
if agent.allow_code_execution: # type: ignore[attr-defined]
|
||||||
|
agent.tools += agent.get_code_execution_tools() # type: ignore[attr-defined]
|
||||||
|
if not agent.step_callback: # type: ignore[attr-defined]
|
||||||
|
agent.step_callback = self.step_callback # type: ignore[attr-defined]
|
||||||
|
agent.create_agent_executor()
|
||||||
|
|
||||||
|
def _find_task_index(
|
||||||
|
self, task_id: str, stored_outputs: List[Dict[str, Any]]
|
||||||
|
) -> Optional[int]:
|
||||||
|
return next(
|
||||||
(
|
(
|
||||||
index
|
index
|
||||||
for (index, d) in enumerate(stored_outputs)
|
for (index, d) in enumerate(stored_outputs)
|
||||||
@@ -656,80 +712,20 @@ class Crew(BaseModel):
|
|||||||
),
|
),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
# todo: depending if its seqential or hierarchial how can we run this properly?
|
|
||||||
if self.process == Process.sequential:
|
|
||||||
for task in self.tasks[start_index:]:
|
|
||||||
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
|
||||||
task_output = task.execute_sync(
|
|
||||||
agent=task.agent, context=context, tools=task.tools
|
|
||||||
)
|
|
||||||
task_outputs.append(task_output)
|
|
||||||
self._process_task_result(task, task_output)
|
|
||||||
self._store_execution_log(task, task_output, start_index)
|
|
||||||
start_index += 1
|
|
||||||
|
|
||||||
elif self.process == Process.hierarchical:
|
|
||||||
manager_task = self.tasks[start_index]
|
|
||||||
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
|
||||||
task_output = manager_task.execute_sync(
|
|
||||||
agent=manager_task.agent, context=context, tools=manager_task.tools
|
|
||||||
)
|
|
||||||
task_outputs.append(task_output)
|
|
||||||
self._process_task_result(manager_task, task_output)
|
|
||||||
self._store_execution_log(manager_task, task_output, start_index)
|
|
||||||
|
|
||||||
# Assuming hierarchical process involves manager delegating tasks to other agents
|
|
||||||
for task in self.tasks[start_index + 1 :]:
|
|
||||||
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
|
||||||
task_output = task.execute_sync(
|
|
||||||
agent=task.agent, context=context, tools=task.tools
|
|
||||||
)
|
|
||||||
task_outputs.append(task_output)
|
|
||||||
self._process_task_result(task, task_output)
|
|
||||||
self._store_execution_log(task, task_output, start_index)
|
|
||||||
start_index += 1
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise NotImplementedError(
|
|
||||||
f"The process '{self.process}' is not implemented yet."
|
|
||||||
)
|
|
||||||
|
|
||||||
# this works sequentially
|
|
||||||
def replay_from_task(self, task_id: str, inputs: Dict[str, Any] | None = None):
|
|
||||||
all_tasks = self.tasks.copy()
|
|
||||||
|
|
||||||
|
def replay_from_task(
|
||||||
|
self, task_id: str, inputs: Dict[str, Any] | None = None
|
||||||
|
) -> CrewOutput:
|
||||||
stored_outputs = self._load_stored_outputs()
|
stored_outputs = self._load_stored_outputs()
|
||||||
start_index = next(
|
start_index = self._find_task_index(task_id, stored_outputs)
|
||||||
(
|
|
||||||
index
|
|
||||||
for (index, d) in enumerate(stored_outputs)
|
|
||||||
if d["task_id"] == str(task_id)
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
# Generate tasks based on what was previously replayed
|
|
||||||
if len(self.tasks) != len(stored_outputs):
|
|
||||||
for output in stored_outputs[start_index:]:
|
|
||||||
matching_index = output["task_index"]
|
|
||||||
matching_task = self.tasks[matching_index]
|
|
||||||
if matching_task:
|
|
||||||
new_task = matching_task.copy()
|
|
||||||
new_task.agent = self._get_agent(output["agent_role"])
|
|
||||||
all_tasks.append(new_task)
|
|
||||||
|
|
||||||
if start_index is None:
|
if start_index is None:
|
||||||
raise ValueError(f"Task with id {task_id} not found in the crew's tasks.")
|
raise ValueError(f"Task with id {task_id} not found in the crew's tasks.")
|
||||||
|
|
||||||
# this handles passing the correct context along and updating following task executions with the new task_ouputs as context
|
|
||||||
stored_output_map: Dict[str, dict] = {
|
|
||||||
log["task_id"]: log["output"] for log in stored_outputs
|
|
||||||
}
|
|
||||||
|
|
||||||
task_outputs: List[
|
task_outputs: List[
|
||||||
TaskOutput
|
TaskOutput
|
||||||
] = [] # will propogate the old outputs first to add context then fill the content with the new task outputs relative to the replay start
|
] = [] # will propogate the old outputs first to add context then fill the content with the new task outputs relative to the replay start
|
||||||
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
||||||
context = ""
|
|
||||||
|
|
||||||
# inputs can be overrided with new passed inputs
|
# inputs can be overrided with new passed inputs
|
||||||
replay_inputs = (
|
replay_inputs = (
|
||||||
@@ -738,28 +734,46 @@ class Crew(BaseModel):
|
|||||||
else stored_outputs[start_index].get("inputs", {})
|
else stored_outputs[start_index].get("inputs", {})
|
||||||
)
|
)
|
||||||
|
|
||||||
self._inputs = replay_inputs # overriding
|
self._inputs = replay_inputs
|
||||||
if replay_inputs:
|
if replay_inputs:
|
||||||
self._interpolate_inputs(replay_inputs)
|
self._interpolate_inputs(replay_inputs)
|
||||||
|
if self.process == Process.hierarchical:
|
||||||
|
self._create_manager_agent()
|
||||||
|
for task_index, task in enumerate(self.tasks):
|
||||||
|
if task_index < start_index: # we are skipping this task
|
||||||
|
stored_output = stored_outputs[task_index]["output"]
|
||||||
|
task_output = TaskOutput(
|
||||||
|
description=stored_output["description"],
|
||||||
|
agent=stored_output["agent"],
|
||||||
|
raw=stored_output["raw"],
|
||||||
|
pydantic=stored_output["pydantic"],
|
||||||
|
json_dict=stored_output["json_dict"],
|
||||||
|
output_format=stored_output["output_format"],
|
||||||
|
)
|
||||||
|
task_outputs = [task_output]
|
||||||
|
context = self._set_context(task, task_outputs)
|
||||||
|
|
||||||
for task_index, task in enumerate(all_tasks):
|
|
||||||
if task_index < start_index:
|
|
||||||
# Use stored output for tasks before the replay point
|
|
||||||
if task.id in stored_output_map:
|
|
||||||
stored_output = stored_output_map[task.id]
|
|
||||||
task_output = TaskOutput(
|
|
||||||
description=stored_output["description"],
|
|
||||||
raw_output=stored_output["raw_output"],
|
|
||||||
pydantic_output=stored_output["pydantic_output"],
|
|
||||||
json_output=stored_output["json_output"],
|
|
||||||
agent=stored_output["agent"],
|
|
||||||
)
|
|
||||||
task_outputs.append(task_output)
|
|
||||||
context += (
|
|
||||||
f"\nTask {task_index + 1} Output:\n{task_output.raw_output}"
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
role = task.agent.role if task.agent is not None else "None"
|
if task.agent and task.agent.allow_delegation:
|
||||||
|
agents_for_delegation = [
|
||||||
|
agent for agent in self.agents if agent != task.agent
|
||||||
|
]
|
||||||
|
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
|
||||||
|
task.tools += task.agent.get_delegation_tools(
|
||||||
|
agents_for_delegation
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.process == Process.hierarchical:
|
||||||
|
if task.agent and self.manager_agent:
|
||||||
|
self.manager_agent.tools = task.agent.get_delegation_tools(
|
||||||
|
[task.agent]
|
||||||
|
)
|
||||||
|
if self.manager_agent:
|
||||||
|
self.manager_agent.tools = (
|
||||||
|
self.manager_agent.get_delegation_tools(self.agents)
|
||||||
|
)
|
||||||
|
agent_to_use = task.agent if task.agent else self.manager_agent
|
||||||
|
role = agent_to_use.role if agent_to_use is not None else "None"
|
||||||
log_color = "bold_blue"
|
log_color = "bold_blue"
|
||||||
self._logger.log(
|
self._logger.log(
|
||||||
"debug", f"Replaying Working Agent: {role}", color=log_color
|
"debug", f"Replaying Working Agent: {role}", color=log_color
|
||||||
@@ -776,38 +790,67 @@ class Crew(BaseModel):
|
|||||||
)
|
)
|
||||||
# Execute task for replay and subsequent tasks
|
# Execute task for replay and subsequent tasks
|
||||||
if task.async_execution:
|
if task.async_execution:
|
||||||
|
context = self._set_context(task, task_outputs)
|
||||||
future = task.execute_async(
|
future = task.execute_async(
|
||||||
agent=task.agent, context=context, tools=task.tools
|
agent=agent_to_use, context=context, tools=task.tools
|
||||||
)
|
)
|
||||||
futures.append((task, future, task_index))
|
futures.append((task, future, task_index))
|
||||||
else:
|
else:
|
||||||
if futures:
|
if futures:
|
||||||
async_outputs = self._process_async_tasks(futures)
|
task_outputs = self._process_async_tasks(futures, True)
|
||||||
task_outputs.extend(async_outputs)
|
|
||||||
for output in async_outputs:
|
|
||||||
context += (
|
|
||||||
f"\nTask {task_index + 1} Output:\n{output.raw_output}"
|
|
||||||
)
|
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
|
context = self._set_context(task, task_outputs)
|
||||||
|
|
||||||
task_output = task.execute_sync(
|
task_output = task.execute_sync(
|
||||||
agent=task.agent, context=context, tools=task.tools
|
agent=agent_to_use, context=context, tools=task.tools
|
||||||
)
|
)
|
||||||
task_outputs.append(task_output)
|
task_outputs = [task_output]
|
||||||
self._process_task_result(task, task_output)
|
self._process_task_result(task, task_output)
|
||||||
self._store_execution_log(task, task_output, task_index)
|
self._store_execution_log(
|
||||||
context += (
|
task, task_output, task_index, was_replayed=True
|
||||||
f"\nTask {task_index + 1} Output:\n{task_output.raw_output}"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Process any remaining async tasks
|
# Process any remaining async tasks
|
||||||
if futures:
|
if futures:
|
||||||
async_outputs = self._process_async_tasks(futures)
|
task_outputs = self._process_async_tasks(futures, True)
|
||||||
task_outputs.extend(async_outputs)
|
|
||||||
# Calculate usage metrics
|
if len(task_outputs) != 1:
|
||||||
|
raise ValueError(
|
||||||
|
"Something went wrong. Kickoff should return only one task output."
|
||||||
|
)
|
||||||
|
final_task_output = task_outputs[0]
|
||||||
|
final_string_output = final_task_output.raw
|
||||||
|
self._finish_execution(final_string_output)
|
||||||
|
|
||||||
token_usage = self.calculate_usage_metrics()
|
token_usage = self.calculate_usage_metrics()
|
||||||
|
|
||||||
# Format and return the final output
|
return CrewOutput(
|
||||||
return self._format_output(task_outputs, token_usage)
|
raw=final_task_output.raw,
|
||||||
|
pydantic=final_task_output.pydantic,
|
||||||
|
json_dict=final_task_output.json_dict,
|
||||||
|
tasks_output=[task.output for task in self.tasks if task.output],
|
||||||
|
token_usage=token_usage,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _create_manager_agent(self):
|
||||||
|
i18n = I18N(prompt_file=self.prompt_file)
|
||||||
|
if self.manager_agent is not None:
|
||||||
|
self.manager_agent.allow_delegation = True
|
||||||
|
manager = self.manager_agent
|
||||||
|
if manager.tools is not None and len(manager.tools) > 0:
|
||||||
|
raise Exception("Manager agent should not have tools")
|
||||||
|
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
|
||||||
|
else:
|
||||||
|
manager = Agent(
|
||||||
|
role=i18n.retrieve("hierarchical_manager_agent", "role"),
|
||||||
|
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
|
||||||
|
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
|
||||||
|
tools=AgentTools(agents=self.agents).tools(),
|
||||||
|
llm=self.manager_llm,
|
||||||
|
verbose=self.verbose,
|
||||||
|
)
|
||||||
|
self.manager_agent = manager
|
||||||
|
|
||||||
def _load_stored_outputs(self) -> List[Dict]:
|
def _load_stored_outputs(self) -> List[Dict]:
|
||||||
try:
|
try:
|
||||||
@@ -826,37 +869,7 @@ class Crew(BaseModel):
|
|||||||
)
|
)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def save_execution_logs(self, filename: str | None = None):
|
def _run_hierarchical_process(self) -> CrewOutput:
|
||||||
"""Save execution logs to a file."""
|
|
||||||
if filename:
|
|
||||||
self._log_file = filename
|
|
||||||
try:
|
|
||||||
with open(self._log_file, "w") as f:
|
|
||||||
json.dump(self.execution_logs, f, indent=2, cls=CrewJSONEncoder)
|
|
||||||
except Exception as e:
|
|
||||||
self._logger.log("error", f"Failed to save execution logs: {str(e)}")
|
|
||||||
|
|
||||||
def load_execution_logs(self, filename: str | None = None):
|
|
||||||
"""Load execution logs from a file."""
|
|
||||||
if filename:
|
|
||||||
self._log_file = filename
|
|
||||||
try:
|
|
||||||
with open(self._log_file, "r") as f:
|
|
||||||
self.execution_logs = json.load(f)
|
|
||||||
except FileNotFoundError:
|
|
||||||
self._logger.log(
|
|
||||||
"warning",
|
|
||||||
f"Log file {self._log_file} not found. Starting with empty logs.",
|
|
||||||
)
|
|
||||||
self.execution_logs = []
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
self._logger.log(
|
|
||||||
"error",
|
|
||||||
f"Failed to parse log file {self._log_file}. Starting with empty logs.",
|
|
||||||
)
|
|
||||||
self.execution_logs = []
|
|
||||||
|
|
||||||
def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]:
|
|
||||||
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
||||||
i18n = I18N(prompt_file=self.prompt_file)
|
i18n = I18N(prompt_file=self.prompt_file)
|
||||||
if self.manager_agent is not None:
|
if self.manager_agent is not None:
|
||||||
@@ -878,16 +891,6 @@ class Crew(BaseModel):
|
|||||||
|
|
||||||
return self._execute_tasks(self.tasks, manager)
|
return self._execute_tasks(self.tasks, manager)
|
||||||
|
|
||||||
# final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
|
||||||
# self._finish_execution(final_string_output)
|
|
||||||
|
|
||||||
# token_usage = self.calculate_usage_metrics()
|
|
||||||
|
|
||||||
# return (
|
|
||||||
# self._format_output(task_outputs, token_usage),
|
|
||||||
# token_usage,
|
|
||||||
# )
|
|
||||||
|
|
||||||
def copy(self):
|
def copy(self):
|
||||||
"""Create a deep copy of the Crew."""
|
"""Create a deep copy of the Crew."""
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
from datetime import datetime
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Dict, Any, List
|
||||||
|
|
||||||
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
|
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
|
||||||
|
|
||||||
|
|
||||||
@@ -97,6 +99,18 @@ class TaskOutputJsonHandler:
|
|||||||
json.dump(file_data, file, indent=2, cls=CrewJSONEncoder)
|
json.dump(file_data, file, indent=2, cls=CrewJSONEncoder)
|
||||||
file.truncate()
|
file.truncate()
|
||||||
|
|
||||||
|
def update(self, task_index: int, log: Dict[str, Any]):
|
||||||
|
logs = self.load()
|
||||||
|
if task_index < len(logs):
|
||||||
|
logs[task_index] = log
|
||||||
|
else:
|
||||||
|
logs.append(log)
|
||||||
|
self.save(logs)
|
||||||
|
|
||||||
|
def save(self, logs: List[Dict[str, Any]]):
|
||||||
|
with open(self.file_path, "w") as file:
|
||||||
|
json.dump(logs, file, indent=2, cls=CrewJSONEncoder)
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
"""Reset the JSON file by creating an empty file."""
|
"""Reset the JSON file by creating an empty file."""
|
||||||
with open(self.file_path, "w") as f:
|
with open(self.file_path, "w") as f:
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from crewai.crews.crew_output import CrewOutput
|
|||||||
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
||||||
from crewai.process import Process
|
from crewai.process import Process
|
||||||
from crewai.task import Task
|
from crewai.task import Task
|
||||||
|
from crewai.tasks.output_format import OutputFormat
|
||||||
from crewai.tasks.task_output import TaskOutput
|
from crewai.tasks.task_output import TaskOutput
|
||||||
from crewai.utilities import Logger, RPMController
|
from crewai.utilities import Logger, RPMController
|
||||||
|
|
||||||
@@ -1827,17 +1828,17 @@ def test_replay_feature():
|
|||||||
with patch.object(Task, "execute_sync") as mock_execute_task:
|
with patch.object(Task, "execute_sync") as mock_execute_task:
|
||||||
mock_execute_task.return_value = TaskOutput(
|
mock_execute_task.return_value = TaskOutput(
|
||||||
description="Mock description",
|
description="Mock description",
|
||||||
raw_output="Mocked output for list of ideas",
|
raw="Mocked output for list of ideas",
|
||||||
agent="Researcher",
|
agent="Researcher",
|
||||||
|
json_dict=None,
|
||||||
|
output_format=OutputFormat.RAW,
|
||||||
|
pydantic=None,
|
||||||
|
summary="Mocked output for list of ideas",
|
||||||
)
|
)
|
||||||
|
|
||||||
crew.kickoff()
|
crew.kickoff()
|
||||||
crew.replay_from_task(str(write.id))
|
crew.replay_from_task(str(write.id))
|
||||||
# Ensure context was passed correctly
|
# Ensure context was passed correctly
|
||||||
context_passed = mock_execute_task.call_args_list[1][1]["context"]
|
|
||||||
assert (
|
|
||||||
"Mocked output for list of ideas" in context_passed
|
|
||||||
) # ensure context passed
|
|
||||||
assert mock_execute_task.call_count == 3
|
assert mock_execute_task.call_count == 3
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user