From 7c4b91b852d3ac6cf3e49ea08f5ea7a98eace6c6 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Wed, 10 Jul 2024 07:58:32 -0700 Subject: [PATCH] WIP: core logic of seq and heir for executing tasks added into one --- src/crewai/crew.py | 215 ++++++++++++++++++++++++++++++++------------- 1 file changed, 154 insertions(+), 61 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 12ff00f64..7d003ee87 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -494,12 +494,11 @@ class Crew(BaseModel): def _execute_tasks( self, tasks, - start_index=0, - is_replay=False, + manager: Optional[BaseAgent] = None, ): task_outputs: List[TaskOutput] = [] futures: List[Tuple[Task, Future[TaskOutput], int]] = [] - for task_index, task in enumerate(tasks[start_index:], start=start_index): + for task_index, task in enumerate(tasks): if task.agent and task.agent.allow_delegation: agents_for_delegation = [ agent for agent in self.agents if agent != task.agent @@ -508,40 +507,76 @@ class Crew(BaseModel): task.tools += task.agent.get_delegation_tools(agents_for_delegation) role = task.agent.role if task.agent is not None else "None" - log_prefix = "== Replaying from" if is_replay else "==" - log_color = "bold_blue" if is_replay else "bold_purple" - self._logger.log( - "debug", f"{log_prefix} Working Agent: {role}", color=log_color - ) + if manager: + self._logger.log( + "debug", f"Manager Agent: {manager.role}", color="bold_purple" + ) + else: + self._logger.log("debug", f"Working Agent: {role}", color="bold_purple") self._logger.log( "info", - f"{log_prefix} {'Replaying' if is_replay else 'Starting'} Task: {task.description}", - color=log_color, + f"Starting Task: {task.description}", + color="bold_purple", ) if self.output_log_file: self._file_handler.log( agent=role, task=task.description, status="started" ) - if task.async_execution: context = aggregate_raw_outputs_from_task_outputs(task_outputs) - future = task.execute_async( - agent=task.agent, context=context, tools=task.tools - ) - futures.append((task, future, task_index)) + if self.process == Process.hierarchical: + if task.agent: + task.tools += task.agent.get_delegation_tools(self.agents) + future = task.execute_async( + agent=task.agent, context=context, tools=task.tools + ) + futures.append((task, future, task_index)) + else: + if manager: + manager.tools = manager.get_delegation_tools(self.agents) + future = task.execute_async( + agent=manager, context=context, tools=manager.tools + ) + futures.append((task, future, task_index)) + else: + future = task.execute_async( + agent=task.agent, context=context, tools=task.tools + ) + futures.append((task, future, task_index)) else: if futures: task_outputs = self._process_async_tasks(futures) futures.clear() context = aggregate_raw_outputs_from_task_outputs(task_outputs) - task_output = task.execute_sync( - agent=task.agent, context=context, tools=task.tools - ) - task_outputs = [task_output] - self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, task_index) + + if self.process == Process.hierarchical: + if task.agent: + task.tools += task.agent.get_delegation_tools(self.agents) + task_output = task.execute_sync( + agent=task.agent, context=context, tools=task.tools + ) + task_outputs = [task_output] + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, task_index) + else: + if manager: + manager.tools = manager.get_delegation_tools(self.agents) + task_output = task.execute_sync( + agent=manager, context=context, tools=manager.tools + ) + task_outputs = [task_output] + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, task_index) + else: + task_output = task.execute_sync( + agent=task.agent, context=context, tools=task.tools + ) + task_outputs = [task_output] + + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, task_index) if futures: task_outputs = self._process_async_tasks(futures) @@ -582,6 +617,62 @@ class Crew(BaseModel): return agent return None + def replay_from_task_easy(self, task_id: str, inputs: Dict[str, Any] | None = None): + task_outputs = [] + stored_outputs = self._load_stored_outputs() + start_index = next( + ( + index + for (index, d) in enumerate(stored_outputs) + if d["task_id"] == str(task_id) + ), + None, + ) + # todo: depending if its seqential or hierarchial how can we run this properly? + if self.process == Process.sequential: + for task in self.tasks[start_index:]: + context = aggregate_raw_outputs_from_task_outputs(task_outputs) + task_output = task.execute_sync( + agent=task.agent, context=context, tools=task.tools + ) + task_outputs.append(task_output) + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, start_index) + start_index += 1 + + elif self.process == Process.hierarchical: + manager_task = self.tasks[start_index] + context = aggregate_raw_outputs_from_task_outputs(task_outputs) + task_output = manager_task.execute_sync( + agent=manager_task.agent, context=context, tools=manager_task.tools + ) + task_outputs.append(task_output) + self._process_task_result(manager_task, task_output) + self._store_execution_log(manager_task, task_output, start_index) + + # Assuming hierarchical process involves manager delegating tasks to other agents + for task in self.tasks[start_index + 1 :]: + context = aggregate_raw_outputs_from_task_outputs(task_outputs) + task_output = task.execute_sync( + agent=task.agent, context=context, tools=task.tools + ) + task_outputs.append(task_output) + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, start_index) + start_index += 1 + + else: + raise NotImplementedError( + f"The process '{self.process}' is not implemented yet." + ) + # all_tasks = self.tasks.copy() + # stored_outputs = self._load_stored_outputs() + # start_index = next( + # (index for (index, d) in enumerate(stored_outputs) if d["task_id"] == str(task_id)), + # None, + # ) + + # this works sequentially def replay_from_task(self, task_id: str, inputs: Dict[str, Any] | None = None): all_tasks = self.tasks.copy() @@ -763,52 +854,54 @@ class Crew(BaseModel): ) self.manager_agent = manager - task_outputs: List[TaskOutput] = [] - futures: List[Tuple[Task, Future[TaskOutput]]] = [] + task_outputs = self._execute_tasks(self.tasks, manager) - for task in self.tasks: - self._logger.log("debug", f"Working Agent: {manager.role}") - self._logger.log("info", f"Starting Task: {task.description}") + # task_outputs: List[TaskOutput] = [] + # futures: List[Tuple[Task, Future[TaskOutput]]] = [] - if self.output_log_file: - self._file_handler.log( - agent=manager.role, task=task.description, status="started" - ) + # for task in self.tasks: + # self._logger.log("debug", f"Working Agent: {manager.role}") + # self._logger.log("info", f"Starting Task: {task.description}") - if task.async_execution: - context = aggregate_raw_outputs_from_task_outputs(task_outputs) - future = task.execute_async( - agent=manager, context=context, tools=manager.tools - ) - futures.append((task, future)) - else: - # Before executing a synchronous task, wait for all async tasks to complete - if futures: - # Clear task_outputs before processing async tasks - task_outputs = [] - for future_task, future in futures: - task_output = future.result() - task_outputs.append(task_output) - self._process_task_result(future_task, task_output) + # if self.output_log_file: + # self._file_handler.log( + # agent=manager.role, task=task.description, status="started" + # ) - # Clear the futures list after processing all async results - futures.clear() + # if task.async_execution: + # context = aggregate_raw_outputs_from_task_outputs(task_outputs) + # future = task.execute_async( + # agent=manager, context=context, tools=manager.tools + # ) + # futures.append((task, future)) + # else: + # # Before executing a synchronous task, wait for all async tasks to complete + # if futures: + # # Clear task_outputs before processing async tasks + # task_outputs = [] + # for future_task, future in futures: + # task_output = future.result() + # task_outputs.append(task_output) + # self._process_task_result(future_task, task_output) - context = aggregate_raw_outputs_from_task_outputs(task_outputs) - task_output = task.execute_sync( - agent=manager, context=context, tools=manager.tools - ) - task_outputs = [task_output] - self._process_task_result(task, task_output) + # # Clear the futures list after processing all async results + # futures.clear() - # Process any remaining async results - if futures: - # Clear task_outputs before processing async tasks - task_outputs = [] - for future_task, future in futures: - task_output = future.result() - task_outputs.append(task_output) - self._process_task_result(future_task, task_output) + # context = aggregate_raw_outputs_from_task_outputs(task_outputs) + # task_output = task.execute_sync( + # agent=manager, context=context, tools=manager.tools + # ) + # task_outputs = [task_output] + # self._process_task_result(task, task_output) + + # # Process any remaining async results + # if futures: + # # Clear task_outputs before processing async tasks + # task_outputs = [] + # for future_task, future in futures: + # task_output = future.result() + # task_outputs.append(task_output) + # self._process_task_result(future_task, task_output) final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) self._finish_execution(final_string_output)