From bde0a3e99cf439395f31e5c0eaef892eeabc83a1 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Tue, 16 Jul 2024 20:11:52 -0700 Subject: [PATCH] code cleanup --- src/crewai/crew.py | 67 +++++++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index f025145fc..719d246ae 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -649,34 +649,10 @@ class Crew(BaseModel): self._log_task_start(task, agent_to_use) if isinstance(task, ConditionalTask): - if futures: - task_outputs.extend( - self._process_async_tasks(futures, was_replayed) - ) - futures.clear() - - previous_output = task_outputs[task_index - 1] if task_outputs else None - if previous_output is not None and not task.should_execute( - previous_output - ): - self._logger.log( - "debug", - f"Skipping conditional task: {task.description}", - color="yellow", - ) - skipped_task_output = TaskOutput( - description=task.description, - raw="", - agent=task.agent.role if task.agent else "", - output_format=OutputFormat.RAW, - ) - - if not was_replayed: - self._store_execution_log( - task, - skipped_task_output, - task_index, - ) + skipped_task_output = self._handle_conditional_task( + task, task_outputs, futures, task_index, was_replayed + ) + if skipped_task_output: continue if task.async_execution: @@ -691,9 +667,7 @@ class Crew(BaseModel): futures.append((task, future, task_index)) else: if futures: - task_outputs.extend( - self._process_async_tasks(futures, was_replayed) - ) + task_outputs = self._process_async_tasks(futures, was_replayed) futures.clear() context = self._get_context(task, task_outputs) @@ -711,6 +685,37 @@ class Crew(BaseModel): return self._create_crew_output(task_outputs) + def _handle_conditional_task( + self, + task: ConditionalTask, + task_outputs: List[TaskOutput], + futures: List[Tuple[Task, Future[TaskOutput], int]], + task_index: int, + was_replayed: bool, + ) -> Optional[TaskOutput]: + if futures: + task_outputs = self._process_async_tasks(futures, was_replayed) + futures.clear() + + previous_output = task_outputs[task_index - 1] if task_outputs else None + if previous_output is not None and not task.should_execute(previous_output): + self._logger.log( + "debug", + f"Skipping conditional task: {task.description}", + color="yellow", + ) + skipped_task_output = TaskOutput( + description=task.description, + raw="", + agent=task.agent.role if task.agent else "", + output_format=OutputFormat.RAW, + ) + + if not was_replayed: + self._store_execution_log(task, skipped_task_output, task_index) + return skipped_task_output + return None + def _prepare_task(self, task: Task, manager: Optional[BaseAgent]): if self.process == Process.hierarchical: self._update_manager_tools(task, manager)