WIP: core logic of seq and heir for executing tasks added into one

This commit is contained in:
Lorenze Jay
2024-07-10 07:58:32 -07:00
parent 626e30d4d1
commit 7c4b91b852

View File

@@ -494,12 +494,11 @@ class Crew(BaseModel):
def _execute_tasks( def _execute_tasks(
self, self,
tasks, tasks,
start_index=0, manager: Optional[BaseAgent] = None,
is_replay=False,
): ):
task_outputs: List[TaskOutput] = [] task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput], int]] = [] futures: List[Tuple[Task, Future[TaskOutput], int]] = []
for task_index, task in enumerate(tasks[start_index:], start=start_index): for task_index, task in enumerate(tasks):
if task.agent and task.agent.allow_delegation: if task.agent and task.agent.allow_delegation:
agents_for_delegation = [ agents_for_delegation = [
agent for agent in self.agents if agent != task.agent agent for agent in self.agents if agent != task.agent
@@ -508,24 +507,39 @@ class Crew(BaseModel):
task.tools += task.agent.get_delegation_tools(agents_for_delegation) task.tools += task.agent.get_delegation_tools(agents_for_delegation)
role = task.agent.role if task.agent is not None else "None" role = task.agent.role if task.agent is not None else "None"
log_prefix = "== Replaying from" if is_replay else "==" if manager:
log_color = "bold_blue" if is_replay else "bold_purple"
self._logger.log( self._logger.log(
"debug", f"{log_prefix} Working Agent: {role}", color=log_color "debug", f"Manager Agent: {manager.role}", color="bold_purple"
) )
else:
self._logger.log("debug", f"Working Agent: {role}", color="bold_purple")
self._logger.log( self._logger.log(
"info", "info",
f"{log_prefix} {'Replaying' if is_replay else 'Starting'} Task: {task.description}", f"Starting Task: {task.description}",
color=log_color, color="bold_purple",
) )
if self.output_log_file: if self.output_log_file:
self._file_handler.log( self._file_handler.log(
agent=role, task=task.description, status="started" agent=role, task=task.description, status="started"
) )
if task.async_execution: if task.async_execution:
context = aggregate_raw_outputs_from_task_outputs(task_outputs) context = aggregate_raw_outputs_from_task_outputs(task_outputs)
if self.process == Process.hierarchical:
if task.agent:
task.tools += task.agent.get_delegation_tools(self.agents)
future = task.execute_async(
agent=task.agent, context=context, tools=task.tools
)
futures.append((task, future, task_index))
else:
if manager:
manager.tools = manager.get_delegation_tools(self.agents)
future = task.execute_async(
agent=manager, context=context, tools=manager.tools
)
futures.append((task, future, task_index))
else:
future = task.execute_async( future = task.execute_async(
agent=task.agent, context=context, tools=task.tools agent=task.agent, context=context, tools=task.tools
) )
@@ -536,12 +550,33 @@ class Crew(BaseModel):
futures.clear() futures.clear()
context = aggregate_raw_outputs_from_task_outputs(task_outputs) context = aggregate_raw_outputs_from_task_outputs(task_outputs)
if self.process == Process.hierarchical:
if task.agent:
task.tools += task.agent.get_delegation_tools(self.agents)
task_output = task.execute_sync( task_output = task.execute_sync(
agent=task.agent, context=context, tools=task.tools agent=task.agent, context=context, tools=task.tools
) )
task_outputs = [task_output] task_outputs = [task_output]
self._process_task_result(task, task_output) self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index) self._store_execution_log(task, task_output, task_index)
else:
if manager:
manager.tools = manager.get_delegation_tools(self.agents)
task_output = task.execute_sync(
agent=manager, context=context, tools=manager.tools
)
task_outputs = [task_output]
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index)
else:
task_output = task.execute_sync(
agent=task.agent, context=context, tools=task.tools
)
task_outputs = [task_output]
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index)
if futures: if futures:
task_outputs = self._process_async_tasks(futures) task_outputs = self._process_async_tasks(futures)
@@ -582,6 +617,62 @@ class Crew(BaseModel):
return agent return agent
return None return None
def replay_from_task_easy(self, task_id: str, inputs: Dict[str, Any] | None = None):
task_outputs = []
stored_outputs = self._load_stored_outputs()
start_index = next(
(
index
for (index, d) in enumerate(stored_outputs)
if d["task_id"] == str(task_id)
),
None,
)
# todo: depending if its seqential or hierarchial how can we run this properly?
if self.process == Process.sequential:
for task in self.tasks[start_index:]:
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
task_output = task.execute_sync(
agent=task.agent, context=context, tools=task.tools
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, start_index)
start_index += 1
elif self.process == Process.hierarchical:
manager_task = self.tasks[start_index]
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
task_output = manager_task.execute_sync(
agent=manager_task.agent, context=context, tools=manager_task.tools
)
task_outputs.append(task_output)
self._process_task_result(manager_task, task_output)
self._store_execution_log(manager_task, task_output, start_index)
# Assuming hierarchical process involves manager delegating tasks to other agents
for task in self.tasks[start_index + 1 :]:
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
task_output = task.execute_sync(
agent=task.agent, context=context, tools=task.tools
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, start_index)
start_index += 1
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
# all_tasks = self.tasks.copy()
# stored_outputs = self._load_stored_outputs()
# start_index = next(
# (index for (index, d) in enumerate(stored_outputs) if d["task_id"] == str(task_id)),
# None,
# )
# this works sequentially
def replay_from_task(self, task_id: str, inputs: Dict[str, Any] | None = None): def replay_from_task(self, task_id: str, inputs: Dict[str, Any] | None = None):
all_tasks = self.tasks.copy() all_tasks = self.tasks.copy()
@@ -763,52 +854,54 @@ class Crew(BaseModel):
) )
self.manager_agent = manager self.manager_agent = manager
task_outputs: List[TaskOutput] = [] task_outputs = self._execute_tasks(self.tasks, manager)
futures: List[Tuple[Task, Future[TaskOutput]]] = []
for task in self.tasks: # task_outputs: List[TaskOutput] = []
self._logger.log("debug", f"Working Agent: {manager.role}") # futures: List[Tuple[Task, Future[TaskOutput]]] = []
self._logger.log("info", f"Starting Task: {task.description}")
if self.output_log_file: # for task in self.tasks:
self._file_handler.log( # self._logger.log("debug", f"Working Agent: {manager.role}")
agent=manager.role, task=task.description, status="started" # self._logger.log("info", f"Starting Task: {task.description}")
)
if task.async_execution: # if self.output_log_file:
context = aggregate_raw_outputs_from_task_outputs(task_outputs) # self._file_handler.log(
future = task.execute_async( # agent=manager.role, task=task.description, status="started"
agent=manager, context=context, tools=manager.tools # )
)
futures.append((task, future))
else:
# Before executing a synchronous task, wait for all async tasks to complete
if futures:
# Clear task_outputs before processing async tasks
task_outputs = []
for future_task, future in futures:
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
# Clear the futures list after processing all async results # if task.async_execution:
futures.clear() # context = aggregate_raw_outputs_from_task_outputs(task_outputs)
# future = task.execute_async(
# agent=manager, context=context, tools=manager.tools
# )
# futures.append((task, future))
# else:
# # Before executing a synchronous task, wait for all async tasks to complete
# if futures:
# # Clear task_outputs before processing async tasks
# task_outputs = []
# for future_task, future in futures:
# task_output = future.result()
# task_outputs.append(task_output)
# self._process_task_result(future_task, task_output)
context = aggregate_raw_outputs_from_task_outputs(task_outputs) # # Clear the futures list after processing all async results
task_output = task.execute_sync( # futures.clear()
agent=manager, context=context, tools=manager.tools
)
task_outputs = [task_output]
self._process_task_result(task, task_output)
# Process any remaining async results # context = aggregate_raw_outputs_from_task_outputs(task_outputs)
if futures: # task_output = task.execute_sync(
# Clear task_outputs before processing async tasks # agent=manager, context=context, tools=manager.tools
task_outputs = [] # )
for future_task, future in futures: # task_outputs = [task_output]
task_output = future.result() # self._process_task_result(task, task_output)
task_outputs.append(task_output)
self._process_task_result(future_task, task_output) # # Process any remaining async results
# if futures:
# # Clear task_outputs before processing async tasks
# task_outputs = []
# for future_task, future in futures:
# task_output = future.result()
# task_outputs.append(task_output)
# self._process_task_result(future_task, task_output)
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
self._finish_execution(final_string_output) self._finish_execution(final_string_output)