diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 3a7468ffd..9148bc2a2 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -771,6 +771,65 @@ class Crew(BaseModel): return self._create_crew_output(task_outputs) + def _get_context_based_output( + self, + task: ConditionalTask, + task_outputs: List[TaskOutput], + task_index: int, + ) -> Optional[TaskOutput]: + """Get the output from explicit context tasks.""" + context_task_outputs = [] + for context_task in task.context: + context_task_index = self._find_task_index(context_task) + if context_task_index != -1 and context_task_index < task_index: + for output in task_outputs: + if output.description == context_task.description: + context_task_outputs.append(output) + break + return context_task_outputs[-1] if context_task_outputs else None + + def _get_non_conditional_output( + self, + task_outputs: List[TaskOutput], + task_index: int, + ) -> Optional[TaskOutput]: + """Get the output from the most recent non-conditional task.""" + non_conditional_outputs = [] + for i in range(task_index): + if i < len(self.tasks) and not isinstance(self.tasks[i], ConditionalTask): + for output in task_outputs: + if output.description == self.tasks[i].description: + non_conditional_outputs.append(output) + break + return non_conditional_outputs[-1] if non_conditional_outputs else None + + def _get_previous_output( + self, + task: ConditionalTask, + task_outputs: List[TaskOutput], + task_index: int, + ) -> Optional[TaskOutput]: + """Get the previous output for a conditional task. + + The order of precedence is: + 1. Output from explicit context tasks + 2. Output from the most recent non-conditional task + 3. Output from the immediately preceding task + """ + if task.context and len(task.context) > 0: + previous_output = self._get_context_based_output(task, task_outputs, task_index) + if previous_output: + return previous_output + + previous_output = self._get_non_conditional_output(task_outputs, task_index) + if previous_output: + return previous_output + + if task_outputs and task_index > 0 and task_index <= len(task_outputs): + return task_outputs[task_index - 1] + + return None + def _handle_conditional_task( self, task: ConditionalTask, @@ -779,29 +838,16 @@ class Crew(BaseModel): task_index: int, was_replayed: bool, ) -> Optional[TaskOutput]: + """Handle a conditional task. + + Determines whether a conditional task should be executed based on the output + of previous tasks. If the task should not be executed, returns a skipped task output. + """ if futures: task_outputs = self._process_async_tasks(futures, was_replayed) futures.clear() - if task.context and len(task.context) > 0: - context_task_outputs = [] - for context_task in task.context: - context_task_index = self._find_task_index(context_task) - if context_task_index != -1 and context_task_index < task_index: - for output in task_outputs: - if output.description == context_task.description: - context_task_outputs.append(output) - break - previous_output = context_task_outputs[-1] if context_task_outputs else None - else: - non_conditional_outputs = [] - for i in range(task_index): - if i < len(self.tasks) and not isinstance(self.tasks[i], ConditionalTask): - for output in task_outputs: - if output.description == self.tasks[i].description: - non_conditional_outputs.append(output) - break - previous_output = non_conditional_outputs[-1] if non_conditional_outputs else (task_outputs[task_index - 1] if task_outputs and task_index > 0 and task_index <= len(task_outputs) else None) + previous_output = self._get_previous_output(task, task_outputs, task_index) if previous_output is not None and not task.should_execute(previous_output): self._logger.log(