mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-13 10:08:29 +00:00
Refactor: Improve code organization and maintainability in conditional task handling
Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
@@ -771,6 +771,65 @@ class Crew(BaseModel):
|
||||
|
||||
return self._create_crew_output(task_outputs)
|
||||
|
||||
def _get_context_based_output(
|
||||
self,
|
||||
task: ConditionalTask,
|
||||
task_outputs: List[TaskOutput],
|
||||
task_index: int,
|
||||
) -> Optional[TaskOutput]:
|
||||
"""Get the output from explicit context tasks."""
|
||||
context_task_outputs = []
|
||||
for context_task in task.context:
|
||||
context_task_index = self._find_task_index(context_task)
|
||||
if context_task_index != -1 and context_task_index < task_index:
|
||||
for output in task_outputs:
|
||||
if output.description == context_task.description:
|
||||
context_task_outputs.append(output)
|
||||
break
|
||||
return context_task_outputs[-1] if context_task_outputs else None
|
||||
|
||||
def _get_non_conditional_output(
|
||||
self,
|
||||
task_outputs: List[TaskOutput],
|
||||
task_index: int,
|
||||
) -> Optional[TaskOutput]:
|
||||
"""Get the output from the most recent non-conditional task."""
|
||||
non_conditional_outputs = []
|
||||
for i in range(task_index):
|
||||
if i < len(self.tasks) and not isinstance(self.tasks[i], ConditionalTask):
|
||||
for output in task_outputs:
|
||||
if output.description == self.tasks[i].description:
|
||||
non_conditional_outputs.append(output)
|
||||
break
|
||||
return non_conditional_outputs[-1] if non_conditional_outputs else None
|
||||
|
||||
def _get_previous_output(
|
||||
self,
|
||||
task: ConditionalTask,
|
||||
task_outputs: List[TaskOutput],
|
||||
task_index: int,
|
||||
) -> Optional[TaskOutput]:
|
||||
"""Get the previous output for a conditional task.
|
||||
|
||||
The order of precedence is:
|
||||
1. Output from explicit context tasks
|
||||
2. Output from the most recent non-conditional task
|
||||
3. Output from the immediately preceding task
|
||||
"""
|
||||
if task.context and len(task.context) > 0:
|
||||
previous_output = self._get_context_based_output(task, task_outputs, task_index)
|
||||
if previous_output:
|
||||
return previous_output
|
||||
|
||||
previous_output = self._get_non_conditional_output(task_outputs, task_index)
|
||||
if previous_output:
|
||||
return previous_output
|
||||
|
||||
if task_outputs and task_index > 0 and task_index <= len(task_outputs):
|
||||
return task_outputs[task_index - 1]
|
||||
|
||||
return None
|
||||
|
||||
def _handle_conditional_task(
|
||||
self,
|
||||
task: ConditionalTask,
|
||||
@@ -779,29 +838,16 @@ class Crew(BaseModel):
|
||||
task_index: int,
|
||||
was_replayed: bool,
|
||||
) -> Optional[TaskOutput]:
|
||||
"""Handle a conditional task.
|
||||
|
||||
Determines whether a conditional task should be executed based on the output
|
||||
of previous tasks. If the task should not be executed, returns a skipped task output.
|
||||
"""
|
||||
if futures:
|
||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
||||
futures.clear()
|
||||
|
||||
if task.context and len(task.context) > 0:
|
||||
context_task_outputs = []
|
||||
for context_task in task.context:
|
||||
context_task_index = self._find_task_index(context_task)
|
||||
if context_task_index != -1 and context_task_index < task_index:
|
||||
for output in task_outputs:
|
||||
if output.description == context_task.description:
|
||||
context_task_outputs.append(output)
|
||||
break
|
||||
previous_output = context_task_outputs[-1] if context_task_outputs else None
|
||||
else:
|
||||
non_conditional_outputs = []
|
||||
for i in range(task_index):
|
||||
if i < len(self.tasks) and not isinstance(self.tasks[i], ConditionalTask):
|
||||
for output in task_outputs:
|
||||
if output.description == self.tasks[i].description:
|
||||
non_conditional_outputs.append(output)
|
||||
break
|
||||
previous_output = non_conditional_outputs[-1] if non_conditional_outputs else (task_outputs[task_index - 1] if task_outputs and task_index > 0 and task_index <= len(task_outputs) else None)
|
||||
previous_output = self._get_previous_output(task, task_outputs, task_index)
|
||||
|
||||
if previous_output is not None and not task.should_execute(previous_output):
|
||||
self._logger.log(
|
||||
|
||||
Reference in New Issue
Block a user