Compare commits

...

1 Commits

Author SHA1 Message Date
Brandon Hancock
1eb4717352 wip 2025-03-07 16:39:50 -05:00
6 changed files with 451 additions and 13 deletions

View File

@@ -184,7 +184,7 @@ class Crew(BaseModel):
default=None, default=None,
description="Maximum number of requests per minute for the crew execution to be respected.", description="Maximum number of requests per minute for the crew execution to be respected.",
) )
prompt_file: str = Field( prompt_file: Optional[str] = Field(
default=None, default=None,
description="Path to the prompt json file to be used for the crew.", description="Path to the prompt json file to be used for the crew.",
) )
@@ -808,6 +808,7 @@ class Crew(BaseModel):
) )
if skipped_task_output: if skipped_task_output:
task_outputs.append(skipped_task_output) task_outputs.append(skipped_task_output)
last_sync_output = skipped_task_output
continue continue
if task.async_execution: if task.async_execution:
@@ -821,8 +822,10 @@ class Crew(BaseModel):
) )
futures.append((task, future, task_index)) futures.append((task, future, task_index))
else: else:
# Process any pending async tasks before executing a sync task
if futures: if futures:
task_outputs = self._process_async_tasks(futures, was_replayed) processed_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(processed_outputs)
futures.clear() futures.clear()
context = self._get_context(task, task_outputs) context = self._get_context(task, task_outputs)
@@ -832,11 +835,14 @@ class Crew(BaseModel):
tools=tools_for_task, tools=tools_for_task,
) )
task_outputs.append(task_output) task_outputs.append(task_output)
last_sync_output = task_output
self._process_task_result(task, task_output) self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed) self._store_execution_log(task, task_output, task_index, was_replayed)
# Process any remaining async tasks at the end
if futures: if futures:
task_outputs = self._process_async_tasks(futures, was_replayed) processed_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(processed_outputs)
return self._create_crew_output(task_outputs) return self._create_crew_output(task_outputs)
@@ -848,12 +854,17 @@ class Crew(BaseModel):
task_index: int, task_index: int,
was_replayed: bool, was_replayed: bool,
) -> Optional[TaskOutput]: ) -> Optional[TaskOutput]:
# Process any pending async tasks to ensure we have the most up-to-date context
if futures: if futures:
task_outputs = self._process_async_tasks(futures, was_replayed) processed_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(processed_outputs)
futures.clear() futures.clear()
# Get the previous output to evaluate the condition
previous_output = task_outputs[-1] if task_outputs else None previous_output = task_outputs[-1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
# If there's no previous output or the condition evaluates to False, skip the task
if previous_output is None or not task.should_execute(previous_output):
self._logger.log( self._logger.log(
"debug", "debug",
f"Skipping conditional task: {task.description}", f"Skipping conditional task: {task.description}",
@@ -861,8 +872,13 @@ class Crew(BaseModel):
) )
skipped_task_output = task.get_skipped_task_output() skipped_task_output = task.get_skipped_task_output()
# Store the execution log for the skipped task
if not was_replayed: if not was_replayed:
self._store_execution_log(task, skipped_task_output, task_index) self._store_execution_log(task, skipped_task_output, task_index)
# Set the output on the task itself so it can be referenced later
task.output = skipped_task_output
return skipped_task_output return skipped_task_output
return None return None

View File

@@ -0,0 +1,50 @@
from functools import wraps
from typing import Any, Callable, Optional, Union, cast
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
def task(func: Callable) -> Callable:
"""
Decorator for Flow methods that return a Task.
This decorator ensures that when a method returns a ConditionalTask,
the condition is properly evaluated based on the previous task's output.
Args:
func: The method to decorate
Returns:
The decorated method
"""
setattr(func, "is_task", True)
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
# Set the task name if not already set
if hasattr(result, "name") and not result.name:
result.name = func.__name__
# If this is a ConditionalTask, ensure it has a valid condition
if isinstance(result, ConditionalTask):
# If the condition is a boolean, wrap it in a function
if isinstance(result.condition, bool):
bool_value = result.condition
result.condition = lambda _: bool_value
# Get the previous task output if available
previous_outputs = getattr(self, "_method_outputs", [])
previous_output = previous_outputs[-1] if previous_outputs else None
# If there's a previous output and it's a TaskOutput, check if we should execute
if previous_output and isinstance(previous_output, TaskOutput):
if not result.should_execute(previous_output):
# Return a skipped task output instead of the task
return result.get_skipped_task_output()
return result
return wrapper

View File

@@ -1,8 +1,10 @@
from functools import wraps from functools import wraps
from typing import Callable from typing import Any, Callable, Optional, Union, cast
from crewai import Crew from crewai import Crew
from crewai.project.utils import memoize from crewai.project.utils import memoize
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
"""Decorators for defining crew components and their behaviors.""" """Decorators for defining crew components and their behaviors."""
@@ -21,13 +23,35 @@ def after_kickoff(func):
def task(func): def task(func):
"""Marks a method as a crew task.""" """Marks a method as a crew task."""
func.is_task = True setattr(func, "is_task", True)
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
result = func(*args, **kwargs) result = func(*args, **kwargs)
if not result.name:
# Set the task name if not already set
if hasattr(result, "name") and not result.name:
result.name = func.__name__ result.name = func.__name__
# If this is a ConditionalTask, ensure it has a valid condition
if isinstance(result, ConditionalTask):
# If the condition is a boolean, wrap it in a function
if isinstance(result.condition, bool):
bool_value = result.condition
result.condition = lambda _: bool_value
# Get the previous task output if available
self = args[0] if args else None
if self and hasattr(self, "_method_outputs"):
previous_outputs = getattr(self, "_method_outputs", [])
previous_output = previous_outputs[-1] if previous_outputs else None
# If there's a previous output and it's a TaskOutput, check if we should execute
if previous_output and isinstance(previous_output, TaskOutput):
if not result.should_execute(previous_output):
# Return a skipped task output instead of the task
return result.get_skipped_task_output()
return result return result
return memoize(wrapper) return memoize(wrapper)

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable from typing import Any, Callable, Union, cast
from pydantic import Field from pydantic import Field
@@ -14,17 +14,23 @@ class ConditionalTask(Task):
""" """
condition: Callable[[TaskOutput], bool] = Field( condition: Callable[[TaskOutput], bool] = Field(
default=None, default=lambda _: True, # Default to always execute
description="Maximum number of retries for an agent to execute a task when an error occurs.", description="Function that determines whether the task should be executed or a boolean value.",
) )
def __init__( def __init__(
self, self,
condition: Callable[[Any], bool], condition: Union[Callable[[Any], bool], bool],
**kwargs, **kwargs,
): ):
super().__init__(**kwargs) super().__init__(**kwargs)
self.condition = condition
# If condition is a boolean, wrap it in a function that always returns that boolean
if isinstance(condition, bool):
bool_value = condition
self.condition = lambda _: bool_value
else:
self.condition = cast(Callable[[TaskOutput], bool], condition)
def should_execute(self, context: TaskOutput) -> bool: def should_execute(self, context: TaskOutput) -> bool:
""" """

View File

@@ -0,0 +1,190 @@
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Crew, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
# Create mock agents for testing
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You are a researcher with expertise in finding information.",
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You are a writer with expertise in creating engaging content.",
)
def test_conditional_task_with_boolean_false():
"""Test that a conditional task with a boolean False condition is skipped."""
task1 = Task(
description="Initial task",
expected_output="Initial output",
agent=researcher,
)
# Use a boolean False directly as the condition
task2 = ConditionalTask(
description="Conditional task that should be skipped",
expected_output="This should not be executed",
agent=writer,
condition=False,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
)
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task 1 description",
raw="Task 1 output",
agent="Researcher",
)
result = crew.kickoff()
# Only the first task should be executed
assert mock_execute_sync.call_count == 1
# The conditional task should be skipped
assert task2.output is not None
assert task2.output.raw == ""
# The final output should be from the first task
assert result.raw.startswith("Task 1 output")
def test_conditional_task_with_boolean_true():
"""Test that a conditional task with a boolean True condition is executed."""
task1 = Task(
description="Initial task",
expected_output="Initial output",
agent=researcher,
)
# Use a boolean True directly as the condition
task2 = ConditionalTask(
description="Conditional task that should be executed",
expected_output="This should be executed",
agent=writer,
condition=True,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
)
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task output",
raw="Task output",
agent="Agent",
)
crew.kickoff()
# Both tasks should be executed
assert mock_execute_sync.call_count == 2
def test_multiple_sequential_conditional_tasks():
"""Test that multiple conditional tasks in sequence work correctly."""
task1 = Task(
description="Initial task",
expected_output="Initial output",
agent=researcher,
)
# First conditional task (will be executed)
task2 = ConditionalTask(
description="First conditional task",
expected_output="First conditional output",
agent=writer,
condition=True,
)
# Second conditional task (will be skipped)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Second conditional output",
agent=researcher,
condition=False,
)
# Third conditional task (will be executed)
task4 = ConditionalTask(
description="Third conditional task",
expected_output="Third conditional output",
agent=writer,
condition=True,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3, task4],
)
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task output",
raw="Task output",
agent="Agent",
)
result = crew.kickoff()
# Tasks 1, 2, and 4 should be executed (task 3 is skipped)
assert mock_execute_sync.call_count == 3
# Task 3 should be skipped
assert task3.output is not None
assert task3.output.raw == ""
def test_last_task_conditional():
"""Test that a conditional task at the end of the task list works correctly."""
task1 = Task(
description="Initial task",
expected_output="Initial output",
agent=researcher,
)
# Last task is conditional and will be skipped
task2 = ConditionalTask(
description="Last conditional task",
expected_output="Last conditional output",
agent=writer,
condition=False,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
)
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task 1 output",
raw="Task 1 output",
agent="Researcher",
)
result = crew.kickoff()
# Only the first task should be executed
assert mock_execute_sync.call_count == 1
# The conditional task should be skipped
assert task2.output is not None
assert task2.output.raw == ""
# The final output should be from the first task
assert result.raw.startswith("Task 1 output")

View File

@@ -0,0 +1,152 @@
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Task
from crewai.flow import Flow, listen, start
from crewai.project.annotations import task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
# Create mock agents for testing
researcher = Agent(
role="Researcher",
goal="Research information",
backstory="You are a researcher with expertise in finding information.",
)
writer = Agent(
role="Writer",
goal="Write content",
backstory="You are a writer with expertise in creating engaging content.",
)
class TestFlowWithConditionalTasks(Flow):
"""Test flow with conditional tasks."""
@start()
@task
def initial_task(self):
"""Initial task that always executes."""
return Task(
description="Initial task",
expected_output="Initial output",
agent=researcher,
)
@listen(initial_task)
@task
def conditional_task_false(self):
"""Conditional task that should be skipped."""
return ConditionalTask(
description="Conditional task that should be skipped",
expected_output="This should not be executed",
agent=writer,
condition=False,
)
@listen(initial_task)
@task
def conditional_task_true(self):
"""Conditional task that should be executed."""
return ConditionalTask(
description="Conditional task that should be executed",
expected_output="This should be executed",
agent=writer,
condition=True,
)
@listen(conditional_task_true)
@task
def final_task(self):
"""Final task that executes after the conditional task."""
return Task(
description="Final task",
expected_output="Final output",
agent=researcher,
)
def test_flow_with_conditional_tasks():
"""Test that conditional tasks work correctly in a Flow."""
flow = TestFlowWithConditionalTasks()
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task output",
raw="Task output",
agent="Agent",
)
flow.kickoff()
# The initial task, conditional_task_true, and final_task should be executed
# conditional_task_false should be skipped
assert mock_execute_sync.call_count == 3
class TestFlowWithSequentialConditionalTasks(Flow):
"""Test flow with sequential conditional tasks."""
@start()
@task
def initial_task(self):
"""Initial task that always executes."""
return Task(
description="Initial task",
expected_output="Initial output",
agent=researcher,
)
@listen(initial_task)
@task
def conditional_task_1(self):
"""First conditional task that should be executed."""
return ConditionalTask(
description="First conditional task",
expected_output="First conditional output",
agent=writer,
condition=True,
)
@listen(conditional_task_1)
@task
def conditional_task_2(self):
"""Second conditional task that should be skipped."""
return ConditionalTask(
description="Second conditional task",
expected_output="Second conditional output",
agent=researcher,
condition=False,
)
@listen(conditional_task_2)
@task
def conditional_task_3(self):
"""Third conditional task that should be executed."""
return ConditionalTask(
description="Third conditional task",
expected_output="Third conditional output",
agent=writer,
condition=True,
)
def test_flow_with_sequential_conditional_tasks():
"""Test that sequential conditional tasks work correctly in a Flow."""
flow = TestFlowWithSequentialConditionalTasks()
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task output",
raw="Task output",
agent="Agent",
)
flow.kickoff()
# The initial_task and conditional_task_1 should be executed
# conditional_task_2 should be skipped, and since it's skipped,
# conditional_task_3 should not be triggered
assert mock_execute_sync.call_count == 2