mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-17 04:48:30 +00:00
Compare commits
1 Commits
0.121.0
...
bugfix/con
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1eb4717352 |
@@ -184,7 +184,7 @@ class Crew(BaseModel):
|
|||||||
default=None,
|
default=None,
|
||||||
description="Maximum number of requests per minute for the crew execution to be respected.",
|
description="Maximum number of requests per minute for the crew execution to be respected.",
|
||||||
)
|
)
|
||||||
prompt_file: str = Field(
|
prompt_file: Optional[str] = Field(
|
||||||
default=None,
|
default=None,
|
||||||
description="Path to the prompt json file to be used for the crew.",
|
description="Path to the prompt json file to be used for the crew.",
|
||||||
)
|
)
|
||||||
@@ -808,6 +808,7 @@ class Crew(BaseModel):
|
|||||||
)
|
)
|
||||||
if skipped_task_output:
|
if skipped_task_output:
|
||||||
task_outputs.append(skipped_task_output)
|
task_outputs.append(skipped_task_output)
|
||||||
|
last_sync_output = skipped_task_output
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if task.async_execution:
|
if task.async_execution:
|
||||||
@@ -821,8 +822,10 @@ class Crew(BaseModel):
|
|||||||
)
|
)
|
||||||
futures.append((task, future, task_index))
|
futures.append((task, future, task_index))
|
||||||
else:
|
else:
|
||||||
|
# Process any pending async tasks before executing a sync task
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
processed_outputs = self._process_async_tasks(futures, was_replayed)
|
||||||
|
task_outputs.extend(processed_outputs)
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
context = self._get_context(task, task_outputs)
|
context = self._get_context(task, task_outputs)
|
||||||
@@ -832,11 +835,14 @@ class Crew(BaseModel):
|
|||||||
tools=tools_for_task,
|
tools=tools_for_task,
|
||||||
)
|
)
|
||||||
task_outputs.append(task_output)
|
task_outputs.append(task_output)
|
||||||
|
last_sync_output = task_output
|
||||||
self._process_task_result(task, task_output)
|
self._process_task_result(task, task_output)
|
||||||
self._store_execution_log(task, task_output, task_index, was_replayed)
|
self._store_execution_log(task, task_output, task_index, was_replayed)
|
||||||
|
|
||||||
|
# Process any remaining async tasks at the end
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
processed_outputs = self._process_async_tasks(futures, was_replayed)
|
||||||
|
task_outputs.extend(processed_outputs)
|
||||||
|
|
||||||
return self._create_crew_output(task_outputs)
|
return self._create_crew_output(task_outputs)
|
||||||
|
|
||||||
@@ -848,12 +854,17 @@ class Crew(BaseModel):
|
|||||||
task_index: int,
|
task_index: int,
|
||||||
was_replayed: bool,
|
was_replayed: bool,
|
||||||
) -> Optional[TaskOutput]:
|
) -> Optional[TaskOutput]:
|
||||||
|
# Process any pending async tasks to ensure we have the most up-to-date context
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
processed_outputs = self._process_async_tasks(futures, was_replayed)
|
||||||
|
task_outputs.extend(processed_outputs)
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
|
# Get the previous output to evaluate the condition
|
||||||
previous_output = task_outputs[-1] if task_outputs else None
|
previous_output = task_outputs[-1] if task_outputs else None
|
||||||
if previous_output is not None and not task.should_execute(previous_output):
|
|
||||||
|
# If there's no previous output or the condition evaluates to False, skip the task
|
||||||
|
if previous_output is None or not task.should_execute(previous_output):
|
||||||
self._logger.log(
|
self._logger.log(
|
||||||
"debug",
|
"debug",
|
||||||
f"Skipping conditional task: {task.description}",
|
f"Skipping conditional task: {task.description}",
|
||||||
@@ -861,8 +872,13 @@ class Crew(BaseModel):
|
|||||||
)
|
)
|
||||||
skipped_task_output = task.get_skipped_task_output()
|
skipped_task_output = task.get_skipped_task_output()
|
||||||
|
|
||||||
|
# Store the execution log for the skipped task
|
||||||
if not was_replayed:
|
if not was_replayed:
|
||||||
self._store_execution_log(task, skipped_task_output, task_index)
|
self._store_execution_log(task, skipped_task_output, task_index)
|
||||||
|
|
||||||
|
# Set the output on the task itself so it can be referenced later
|
||||||
|
task.output = skipped_task_output
|
||||||
|
|
||||||
return skipped_task_output
|
return skipped_task_output
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|||||||
50
src/crewai/flow/task_decorator.py
Normal file
50
src/crewai/flow/task_decorator.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
from functools import wraps
|
||||||
|
from typing import Any, Callable, Optional, Union, cast
|
||||||
|
|
||||||
|
from crewai.tasks.conditional_task import ConditionalTask
|
||||||
|
from crewai.tasks.task_output import TaskOutput
|
||||||
|
|
||||||
|
|
||||||
|
def task(func: Callable) -> Callable:
|
||||||
|
"""
|
||||||
|
Decorator for Flow methods that return a Task.
|
||||||
|
|
||||||
|
This decorator ensures that when a method returns a ConditionalTask,
|
||||||
|
the condition is properly evaluated based on the previous task's output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: The method to decorate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The decorated method
|
||||||
|
"""
|
||||||
|
setattr(func, "is_task", True)
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
result = func(self, *args, **kwargs)
|
||||||
|
|
||||||
|
# Set the task name if not already set
|
||||||
|
if hasattr(result, "name") and not result.name:
|
||||||
|
result.name = func.__name__
|
||||||
|
|
||||||
|
# If this is a ConditionalTask, ensure it has a valid condition
|
||||||
|
if isinstance(result, ConditionalTask):
|
||||||
|
# If the condition is a boolean, wrap it in a function
|
||||||
|
if isinstance(result.condition, bool):
|
||||||
|
bool_value = result.condition
|
||||||
|
result.condition = lambda _: bool_value
|
||||||
|
|
||||||
|
# Get the previous task output if available
|
||||||
|
previous_outputs = getattr(self, "_method_outputs", [])
|
||||||
|
previous_output = previous_outputs[-1] if previous_outputs else None
|
||||||
|
|
||||||
|
# If there's a previous output and it's a TaskOutput, check if we should execute
|
||||||
|
if previous_output and isinstance(previous_output, TaskOutput):
|
||||||
|
if not result.should_execute(previous_output):
|
||||||
|
# Return a skipped task output instead of the task
|
||||||
|
return result.get_skipped_task_output()
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
return wrapper
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
from functools import wraps
|
from functools import wraps
|
||||||
from typing import Callable
|
from typing import Any, Callable, Optional, Union, cast
|
||||||
|
|
||||||
from crewai import Crew
|
from crewai import Crew
|
||||||
from crewai.project.utils import memoize
|
from crewai.project.utils import memoize
|
||||||
|
from crewai.tasks.conditional_task import ConditionalTask
|
||||||
|
from crewai.tasks.task_output import TaskOutput
|
||||||
|
|
||||||
"""Decorators for defining crew components and their behaviors."""
|
"""Decorators for defining crew components and their behaviors."""
|
||||||
|
|
||||||
@@ -21,13 +23,35 @@ def after_kickoff(func):
|
|||||||
|
|
||||||
def task(func):
|
def task(func):
|
||||||
"""Marks a method as a crew task."""
|
"""Marks a method as a crew task."""
|
||||||
func.is_task = True
|
setattr(func, "is_task", True)
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
result = func(*args, **kwargs)
|
result = func(*args, **kwargs)
|
||||||
if not result.name:
|
|
||||||
|
# Set the task name if not already set
|
||||||
|
if hasattr(result, "name") and not result.name:
|
||||||
result.name = func.__name__
|
result.name = func.__name__
|
||||||
|
|
||||||
|
# If this is a ConditionalTask, ensure it has a valid condition
|
||||||
|
if isinstance(result, ConditionalTask):
|
||||||
|
# If the condition is a boolean, wrap it in a function
|
||||||
|
if isinstance(result.condition, bool):
|
||||||
|
bool_value = result.condition
|
||||||
|
result.condition = lambda _: bool_value
|
||||||
|
|
||||||
|
# Get the previous task output if available
|
||||||
|
self = args[0] if args else None
|
||||||
|
if self and hasattr(self, "_method_outputs"):
|
||||||
|
previous_outputs = getattr(self, "_method_outputs", [])
|
||||||
|
previous_output = previous_outputs[-1] if previous_outputs else None
|
||||||
|
|
||||||
|
# If there's a previous output and it's a TaskOutput, check if we should execute
|
||||||
|
if previous_output and isinstance(previous_output, TaskOutput):
|
||||||
|
if not result.should_execute(previous_output):
|
||||||
|
# Return a skipped task output instead of the task
|
||||||
|
return result.get_skipped_task_output()
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return memoize(wrapper)
|
return memoize(wrapper)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from typing import Any, Callable
|
from typing import Any, Callable, Union, cast
|
||||||
|
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
|
|
||||||
@@ -14,17 +14,23 @@ class ConditionalTask(Task):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
condition: Callable[[TaskOutput], bool] = Field(
|
condition: Callable[[TaskOutput], bool] = Field(
|
||||||
default=None,
|
default=lambda _: True, # Default to always execute
|
||||||
description="Maximum number of retries for an agent to execute a task when an error occurs.",
|
description="Function that determines whether the task should be executed or a boolean value.",
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
condition: Callable[[Any], bool],
|
condition: Union[Callable[[Any], bool], bool],
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.condition = condition
|
|
||||||
|
# If condition is a boolean, wrap it in a function that always returns that boolean
|
||||||
|
if isinstance(condition, bool):
|
||||||
|
bool_value = condition
|
||||||
|
self.condition = lambda _: bool_value
|
||||||
|
else:
|
||||||
|
self.condition = cast(Callable[[TaskOutput], bool], condition)
|
||||||
|
|
||||||
def should_execute(self, context: TaskOutput) -> bool:
|
def should_execute(self, context: TaskOutput) -> bool:
|
||||||
"""
|
"""
|
||||||
|
|||||||
190
tests/conditional_task_test.py
Normal file
190
tests/conditional_task_test.py
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from crewai import Agent, Crew, Task
|
||||||
|
from crewai.tasks.conditional_task import ConditionalTask
|
||||||
|
from crewai.tasks.task_output import TaskOutput
|
||||||
|
|
||||||
|
# Create mock agents for testing
|
||||||
|
researcher = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research information",
|
||||||
|
backstory="You are a researcher with expertise in finding information.",
|
||||||
|
)
|
||||||
|
|
||||||
|
writer = Agent(
|
||||||
|
role="Writer",
|
||||||
|
goal="Write content",
|
||||||
|
backstory="You are a writer with expertise in creating engaging content.",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_conditional_task_with_boolean_false():
|
||||||
|
"""Test that a conditional task with a boolean False condition is skipped."""
|
||||||
|
task1 = Task(
|
||||||
|
description="Initial task",
|
||||||
|
expected_output="Initial output",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use a boolean False directly as the condition
|
||||||
|
task2 = ConditionalTask(
|
||||||
|
description="Conditional task that should be skipped",
|
||||||
|
expected_output="This should not be executed",
|
||||||
|
agent=writer,
|
||||||
|
condition=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher, writer],
|
||||||
|
tasks=[task1, task2],
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
||||||
|
mock_execute_sync.return_value = TaskOutput(
|
||||||
|
description="Task 1 description",
|
||||||
|
raw="Task 1 output",
|
||||||
|
agent="Researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
result = crew.kickoff()
|
||||||
|
|
||||||
|
# Only the first task should be executed
|
||||||
|
assert mock_execute_sync.call_count == 1
|
||||||
|
|
||||||
|
# The conditional task should be skipped
|
||||||
|
assert task2.output is not None
|
||||||
|
assert task2.output.raw == ""
|
||||||
|
|
||||||
|
# The final output should be from the first task
|
||||||
|
assert result.raw.startswith("Task 1 output")
|
||||||
|
|
||||||
|
|
||||||
|
def test_conditional_task_with_boolean_true():
|
||||||
|
"""Test that a conditional task with a boolean True condition is executed."""
|
||||||
|
task1 = Task(
|
||||||
|
description="Initial task",
|
||||||
|
expected_output="Initial output",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use a boolean True directly as the condition
|
||||||
|
task2 = ConditionalTask(
|
||||||
|
description="Conditional task that should be executed",
|
||||||
|
expected_output="This should be executed",
|
||||||
|
agent=writer,
|
||||||
|
condition=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher, writer],
|
||||||
|
tasks=[task1, task2],
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
||||||
|
mock_execute_sync.return_value = TaskOutput(
|
||||||
|
description="Task output",
|
||||||
|
raw="Task output",
|
||||||
|
agent="Agent",
|
||||||
|
)
|
||||||
|
|
||||||
|
crew.kickoff()
|
||||||
|
|
||||||
|
# Both tasks should be executed
|
||||||
|
assert mock_execute_sync.call_count == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_sequential_conditional_tasks():
|
||||||
|
"""Test that multiple conditional tasks in sequence work correctly."""
|
||||||
|
task1 = Task(
|
||||||
|
description="Initial task",
|
||||||
|
expected_output="Initial output",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
# First conditional task (will be executed)
|
||||||
|
task2 = ConditionalTask(
|
||||||
|
description="First conditional task",
|
||||||
|
expected_output="First conditional output",
|
||||||
|
agent=writer,
|
||||||
|
condition=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Second conditional task (will be skipped)
|
||||||
|
task3 = ConditionalTask(
|
||||||
|
description="Second conditional task",
|
||||||
|
expected_output="Second conditional output",
|
||||||
|
agent=researcher,
|
||||||
|
condition=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Third conditional task (will be executed)
|
||||||
|
task4 = ConditionalTask(
|
||||||
|
description="Third conditional task",
|
||||||
|
expected_output="Third conditional output",
|
||||||
|
agent=writer,
|
||||||
|
condition=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher, writer],
|
||||||
|
tasks=[task1, task2, task3, task4],
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
||||||
|
mock_execute_sync.return_value = TaskOutput(
|
||||||
|
description="Task output",
|
||||||
|
raw="Task output",
|
||||||
|
agent="Agent",
|
||||||
|
)
|
||||||
|
|
||||||
|
result = crew.kickoff()
|
||||||
|
|
||||||
|
# Tasks 1, 2, and 4 should be executed (task 3 is skipped)
|
||||||
|
assert mock_execute_sync.call_count == 3
|
||||||
|
|
||||||
|
# Task 3 should be skipped
|
||||||
|
assert task3.output is not None
|
||||||
|
assert task3.output.raw == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_last_task_conditional():
|
||||||
|
"""Test that a conditional task at the end of the task list works correctly."""
|
||||||
|
task1 = Task(
|
||||||
|
description="Initial task",
|
||||||
|
expected_output="Initial output",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Last task is conditional and will be skipped
|
||||||
|
task2 = ConditionalTask(
|
||||||
|
description="Last conditional task",
|
||||||
|
expected_output="Last conditional output",
|
||||||
|
agent=writer,
|
||||||
|
condition=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher, writer],
|
||||||
|
tasks=[task1, task2],
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
||||||
|
mock_execute_sync.return_value = TaskOutput(
|
||||||
|
description="Task 1 output",
|
||||||
|
raw="Task 1 output",
|
||||||
|
agent="Researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
result = crew.kickoff()
|
||||||
|
|
||||||
|
# Only the first task should be executed
|
||||||
|
assert mock_execute_sync.call_count == 1
|
||||||
|
|
||||||
|
# The conditional task should be skipped
|
||||||
|
assert task2.output is not None
|
||||||
|
assert task2.output.raw == ""
|
||||||
|
|
||||||
|
# The final output should be from the first task
|
||||||
|
assert result.raw.startswith("Task 1 output")
|
||||||
152
tests/flow_conditional_task_test.py
Normal file
152
tests/flow_conditional_task_test.py
Normal file
@@ -0,0 +1,152 @@
|
|||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from crewai import Agent, Task
|
||||||
|
from crewai.flow import Flow, listen, start
|
||||||
|
from crewai.project.annotations import task
|
||||||
|
from crewai.tasks.conditional_task import ConditionalTask
|
||||||
|
from crewai.tasks.task_output import TaskOutput
|
||||||
|
|
||||||
|
# Create mock agents for testing
|
||||||
|
researcher = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research information",
|
||||||
|
backstory="You are a researcher with expertise in finding information.",
|
||||||
|
)
|
||||||
|
|
||||||
|
writer = Agent(
|
||||||
|
role="Writer",
|
||||||
|
goal="Write content",
|
||||||
|
backstory="You are a writer with expertise in creating engaging content.",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestFlowWithConditionalTasks(Flow):
|
||||||
|
"""Test flow with conditional tasks."""
|
||||||
|
|
||||||
|
@start()
|
||||||
|
@task
|
||||||
|
def initial_task(self):
|
||||||
|
"""Initial task that always executes."""
|
||||||
|
return Task(
|
||||||
|
description="Initial task",
|
||||||
|
expected_output="Initial output",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
@listen(initial_task)
|
||||||
|
@task
|
||||||
|
def conditional_task_false(self):
|
||||||
|
"""Conditional task that should be skipped."""
|
||||||
|
return ConditionalTask(
|
||||||
|
description="Conditional task that should be skipped",
|
||||||
|
expected_output="This should not be executed",
|
||||||
|
agent=writer,
|
||||||
|
condition=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
@listen(initial_task)
|
||||||
|
@task
|
||||||
|
def conditional_task_true(self):
|
||||||
|
"""Conditional task that should be executed."""
|
||||||
|
return ConditionalTask(
|
||||||
|
description="Conditional task that should be executed",
|
||||||
|
expected_output="This should be executed",
|
||||||
|
agent=writer,
|
||||||
|
condition=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
@listen(conditional_task_true)
|
||||||
|
@task
|
||||||
|
def final_task(self):
|
||||||
|
"""Final task that executes after the conditional task."""
|
||||||
|
return Task(
|
||||||
|
description="Final task",
|
||||||
|
expected_output="Final output",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_with_conditional_tasks():
|
||||||
|
"""Test that conditional tasks work correctly in a Flow."""
|
||||||
|
flow = TestFlowWithConditionalTasks()
|
||||||
|
|
||||||
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
||||||
|
mock_execute_sync.return_value = TaskOutput(
|
||||||
|
description="Task output",
|
||||||
|
raw="Task output",
|
||||||
|
agent="Agent",
|
||||||
|
)
|
||||||
|
|
||||||
|
flow.kickoff()
|
||||||
|
|
||||||
|
# The initial task, conditional_task_true, and final_task should be executed
|
||||||
|
# conditional_task_false should be skipped
|
||||||
|
assert mock_execute_sync.call_count == 3
|
||||||
|
|
||||||
|
|
||||||
|
class TestFlowWithSequentialConditionalTasks(Flow):
|
||||||
|
"""Test flow with sequential conditional tasks."""
|
||||||
|
|
||||||
|
@start()
|
||||||
|
@task
|
||||||
|
def initial_task(self):
|
||||||
|
"""Initial task that always executes."""
|
||||||
|
return Task(
|
||||||
|
description="Initial task",
|
||||||
|
expected_output="Initial output",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
@listen(initial_task)
|
||||||
|
@task
|
||||||
|
def conditional_task_1(self):
|
||||||
|
"""First conditional task that should be executed."""
|
||||||
|
return ConditionalTask(
|
||||||
|
description="First conditional task",
|
||||||
|
expected_output="First conditional output",
|
||||||
|
agent=writer,
|
||||||
|
condition=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
@listen(conditional_task_1)
|
||||||
|
@task
|
||||||
|
def conditional_task_2(self):
|
||||||
|
"""Second conditional task that should be skipped."""
|
||||||
|
return ConditionalTask(
|
||||||
|
description="Second conditional task",
|
||||||
|
expected_output="Second conditional output",
|
||||||
|
agent=researcher,
|
||||||
|
condition=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
@listen(conditional_task_2)
|
||||||
|
@task
|
||||||
|
def conditional_task_3(self):
|
||||||
|
"""Third conditional task that should be executed."""
|
||||||
|
return ConditionalTask(
|
||||||
|
description="Third conditional task",
|
||||||
|
expected_output="Third conditional output",
|
||||||
|
agent=writer,
|
||||||
|
condition=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_with_sequential_conditional_tasks():
|
||||||
|
"""Test that sequential conditional tasks work correctly in a Flow."""
|
||||||
|
flow = TestFlowWithSequentialConditionalTasks()
|
||||||
|
|
||||||
|
with patch.object(Task, "execute_sync") as mock_execute_sync:
|
||||||
|
mock_execute_sync.return_value = TaskOutput(
|
||||||
|
description="Task output",
|
||||||
|
raw="Task output",
|
||||||
|
agent="Agent",
|
||||||
|
)
|
||||||
|
|
||||||
|
flow.kickoff()
|
||||||
|
|
||||||
|
# The initial_task and conditional_task_1 should be executed
|
||||||
|
# conditional_task_2 should be skipped, and since it's skipped,
|
||||||
|
# conditional_task_3 should not be triggered
|
||||||
|
assert mock_execute_sync.call_count == 2
|
||||||
Reference in New Issue
Block a user