conditional task feat

This commit is contained in:
Lorenze Jay
2024-07-16 12:04:34 -07:00
parent ac2815c781
commit c320fc655e
6 changed files with 664 additions and 2 deletions

View File

@@ -28,6 +28,8 @@ from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.process import Process
from crewai.task import Task
from crewai.conditional_task import ConditionalTask
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
@@ -295,6 +297,29 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
if self.tasks and isinstance(self.tasks[0], ConditionalTask):
raise PydanticCustomError(
"invalid_first_task",
"The first task cannot be a ConditionalTask.",
{},
)
return self
@model_validator(mode="after")
def validate_async_tasks_not_async(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
for task in self.tasks:
if task.async_execution and isinstance(task, ConditionalTask):
raise PydanticCustomError(
"invalid_async_conditional_task",
f"Conditional Task: {task.description} , cannot be executed asynchronously.", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString"
{},
)
return self
@model_validator(mode="after")
def validate_async_task_cannot_include_sequential_async_tasks_in_context(self):
"""
@@ -622,7 +647,35 @@ class Crew(BaseModel):
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
self._log_task_start(task, agent_to_use)
if isinstance(task, ConditionalTask):
if futures:
task_outputs.extend(
self._process_async_tasks(futures, was_replayed)
)
futures.clear()
previous_output = task_outputs[task_index - 1] if task_outputs else None
if previous_output is not None and not task.should_execute(
previous_output
):
self._logger.log(
"info",
f"Skipping conditional task: {task.description}",
color="yellow",
)
skipped_task_output = TaskOutput(
description=task.description,
raw="",
agent=task.agent.role if task.agent else "",
output_format=OutputFormat.RAW,
)
if not was_replayed:
self._store_execution_log(
task,
skipped_task_output,
task_index,
)
continue
if task.async_execution:
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
@@ -687,6 +740,34 @@ class Crew(BaseModel):
# Add the new tool
task.tools.append(new_tool)
def _handle_conditional_task(
self,
task: ConditionalTask,
futures: List[Tuple[Task, Future[TaskOutput], int]],
task_outputs: List[TaskOutput],
task_index: int,
was_replayed: bool,
) -> bool:
"""
Handle conditional task execution.
Returns:
bool: True if the task should be executed, False if it should be skipped.
"""
if futures:
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
futures.clear()
previous_output = task_outputs[task_index - 1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"info",
f"Skipping conditional task: {task.description}",
color="yellow",
)
return False
return True
def _log_task_start(self, task: Task, agent: Optional[BaseAgent]):
color = self._logging_color
role = agent.role if agent else "None"