diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 7d003ee87..608d79a90 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -480,20 +480,14 @@ class Crew(BaseModel): self.execution_logs.append(log) self._task_output_handler.append(log) - def _run_sequential_process(self) -> CrewOutput: + def _run_sequential_process(self): """Executes tasks sequentially and returns the final output.""" self.execution_logs = [] - task_outputs = self._execute_tasks(self.tasks) - final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) - self._finish_execution(final_string_output) - self.save_execution_logs() - token_usage = self.calculate_usage_metrics() - - return self._format_output(task_outputs, token_usage) + return self._execute_tasks(self.tasks) def _execute_tasks( self, - tasks, + tasks: List[Task], manager: Optional[BaseAgent] = None, ): task_outputs: List[TaskOutput] = [] @@ -506,13 +500,16 @@ class Crew(BaseModel): if len(self.agents) > 1 and len(agents_for_delegation) > 0: task.tools += task.agent.get_delegation_tools(agents_for_delegation) - role = task.agent.role if task.agent is not None else "None" - if manager: - self._logger.log( - "debug", f"Manager Agent: {manager.role}", color="bold_purple" - ) - else: - self._logger.log("debug", f"Working Agent: {role}", color="bold_purple") + if self.process == Process.hierarchical: + if task.agent and manager: + manager.tools = task.agent.get_delegation_tools([task.agent]) + if manager: + manager.tools = manager.get_delegation_tools(self.agents) + + agent_to_use = task.agent if task.agent else manager + role = agent_to_use.role if agent_to_use is not None else "None" + + self._logger.log("debug", f"Working Agent: {role}", color="bold_purple") self._logger.log( "info", f"Starting Task: {task.description}", @@ -525,56 +522,32 @@ class Crew(BaseModel): ) if task.async_execution: context = aggregate_raw_outputs_from_task_outputs(task_outputs) - if self.process == Process.hierarchical: - if task.agent: - task.tools += task.agent.get_delegation_tools(self.agents) - future = task.execute_async( - agent=task.agent, context=context, tools=task.tools - ) - futures.append((task, future, task_index)) - else: - if manager: - manager.tools = manager.get_delegation_tools(self.agents) - future = task.execute_async( - agent=manager, context=context, tools=manager.tools - ) - futures.append((task, future, task_index)) - else: + if agent_to_use: future = task.execute_async( - agent=task.agent, context=context, tools=task.tools + agent=agent_to_use, + context=context, + tools=agent_to_use.tools, ) futures.append((task, future, task_index)) - else: + else: # sequential async + self._logger.log( + "warning", f"No agent available for task: {task.description}" + ) + + else: # sync execution if futures: task_outputs = self._process_async_tasks(futures) futures.clear() context = aggregate_raw_outputs_from_task_outputs(task_outputs) - if self.process == Process.hierarchical: - if task.agent: - task.tools += task.agent.get_delegation_tools(self.agents) - task_output = task.execute_sync( - agent=task.agent, context=context, tools=task.tools - ) - task_outputs = [task_output] - self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, task_index) - else: - if manager: - manager.tools = manager.get_delegation_tools(self.agents) - task_output = task.execute_sync( - agent=manager, context=context, tools=manager.tools - ) - task_outputs = [task_output] - self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, task_index) - else: + if agent_to_use: task_output = task.execute_sync( - agent=task.agent, context=context, tools=task.tools + agent=agent_to_use, + context=context, + tools=agent_to_use.tools, ) task_outputs = [task_output] - self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index) @@ -665,12 +638,6 @@ class Crew(BaseModel): raise NotImplementedError( f"The process '{self.process}' is not implemented yet." ) - # all_tasks = self.tasks.copy() - # stored_outputs = self._load_stored_outputs() - # start_index = next( - # (index for (index, d) in enumerate(stored_outputs) if d["task_id"] == str(task_id)), - # None, - # ) # this works sequentially def replay_from_task(self, task_id: str, inputs: Dict[str, Any] | None = None): @@ -854,64 +821,17 @@ class Crew(BaseModel): ) self.manager_agent = manager - task_outputs = self._execute_tasks(self.tasks, manager) + return self._execute_tasks(self.tasks, manager) - # task_outputs: List[TaskOutput] = [] - # futures: List[Tuple[Task, Future[TaskOutput]]] = [] + # final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) + # self._finish_execution(final_string_output) - # for task in self.tasks: - # self._logger.log("debug", f"Working Agent: {manager.role}") - # self._logger.log("info", f"Starting Task: {task.description}") + # token_usage = self.calculate_usage_metrics() - # if self.output_log_file: - # self._file_handler.log( - # agent=manager.role, task=task.description, status="started" - # ) - - # if task.async_execution: - # context = aggregate_raw_outputs_from_task_outputs(task_outputs) - # future = task.execute_async( - # agent=manager, context=context, tools=manager.tools - # ) - # futures.append((task, future)) - # else: - # # Before executing a synchronous task, wait for all async tasks to complete - # if futures: - # # Clear task_outputs before processing async tasks - # task_outputs = [] - # for future_task, future in futures: - # task_output = future.result() - # task_outputs.append(task_output) - # self._process_task_result(future_task, task_output) - - # # Clear the futures list after processing all async results - # futures.clear() - - # context = aggregate_raw_outputs_from_task_outputs(task_outputs) - # task_output = task.execute_sync( - # agent=manager, context=context, tools=manager.tools - # ) - # task_outputs = [task_output] - # self._process_task_result(task, task_output) - - # # Process any remaining async results - # if futures: - # # Clear task_outputs before processing async tasks - # task_outputs = [] - # for future_task, future in futures: - # task_output = future.result() - # task_outputs.append(task_output) - # self._process_task_result(future_task, task_output) - - final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) - self._finish_execution(final_string_output) - - token_usage = self.calculate_usage_metrics() - - return ( - self._format_output(task_outputs, token_usage), - token_usage, - ) + # return ( + # self._format_output(task_outputs, token_usage), + # token_usage, + # ) def copy(self): """Create a deep copy of the Crew.""" diff --git a/src/crewai/task.py b/src/crewai/task.py index 3d7228f53..35f16b3a1 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -211,6 +211,7 @@ class Task(BaseModel): tools: Optional[List[Any]], ) -> TaskOutput: """Run the core execution logic of the task.""" + self.agent = agent agent = agent or self.agent if not agent: raise Exception(