WIP conditional tasks, added test and the logic flow, need to improve things within sequential since DRY best practices can be improved

This commit is contained in:
Lorenze Jay
2024-07-05 08:40:58 -07:00
parent bb33e1813d
commit 0cc37e0d72
6 changed files with 2812 additions and 28 deletions

View File

@@ -3,10 +3,14 @@ from typing import Callable, Optional, Any
from pydantic import BaseModel from pydantic import BaseModel
from crewai.task import Task from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
class ConditionalTask(Task): class ConditionalTask(Task):
"""
A task that can be conditionally executed based on the output of another task.
Note: This cannot be the only task you have in your crew and cannot be the first since its needs context from the previous task.
"""
condition: Optional[Callable[[BaseModel], bool]] = None condition: Optional[Callable[[BaseModel], bool]] = None
def __init__( def __init__(
@@ -19,7 +23,15 @@ class ConditionalTask(Task):
self.condition = condition self.condition = condition
def should_execute(self, context: Any) -> bool: def should_execute(self, context: Any) -> bool:
print("TaskOutput", TaskOutput) """
Determines whether the conditional task should be executed based on the provided context.
Args:
context (Any): The context or output from the previous task that will be evaluated by the condition.
Returns:
bool: True if the task should be executed, False otherwise.
"""
if self.condition: if self.condition:
return self.condition(context) return self.condition(context)
return True return True

View File

@@ -408,21 +408,28 @@ class Crew(BaseModel):
task_outputs: List[TaskOutput] = [] task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput]]] = [] futures: List[Tuple[Task, Future[TaskOutput]]] = []
for i, task in enumerate(self.tasks): for task in self.tasks:
# if isinstance(task, ConditionalTask): if isinstance(task, ConditionalTask):
# # print("task_outputs", task_outputs) if futures:
# previous_output = task_outputs[-1] if task_outputs else None task_outputs = []
# print("previous_output", previous_output) for future_task, future in futures:
# # print("previous_output type", type(previous_output)) task_output = future.result()
# if previous_output is not None: task_outputs.append(task_output)
# if not task.should_execute(previous_output): self._process_task_result(future_task, task_output)
# self._logger.log( futures.clear()
# "info",
# f"Skipping conditional task: {task.description}", previous_output = task_outputs[-1] if task_outputs else None
# color="yellow", if previous_output is not None and not task.should_execute(
# ) previous_output.result()
# continue ):
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation" self._logger.log(
"info",
f"Skipping conditional task: {task.description}",
color="yellow",
)
continue
if task.agent and task.agent.allow_delegation:
agents_for_delegation = [ agents_for_delegation = [
agent for agent in self.agents if agent != task.agent agent for agent in self.agents if agent != task.agent
] ]
@@ -625,9 +632,17 @@ class Crew(BaseModel):
""" """
Formats the output of the crew execution. Formats the output of the crew execution.
""" """
# breakpoint()
task_output = []
for task in self.tasks:
if task.output:
# print("task.output", task.output)
task_output.append(task.output.result())
return CrewOutput( return CrewOutput(
output=output, output=output,
tasks_output=[task.output for task in self.tasks if task], # tasks_output=[task.output for task in self.tasks if task],
tasks_output=task_output,
token_usage=token_usage, token_usage=token_usage,
) )

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List from typing import Any, Dict, List, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@@ -8,7 +8,11 @@ from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
class CrewOutput(BaseModel): class CrewOutput(BaseModel):
output: List[TaskOutput] = Field(description="Result of the final task") output: List[TaskOutput] = Field(description="Result of the final task")
tasks_output: list[TaskOutput] = Field( # NOTE HERE
# tasks_output: list[TaskOutput] = Field(
# description="Output of each task", default=[]
# )
tasks_output: list[Union[str, BaseModel, Dict[str, Any]]] = Field(
description="Output of each task", default=[] description="Output of each task", default=[]
) )
token_usage: Dict[str, Any] = Field( token_usage: Dict[str, Any] = Field(

View File

@@ -29,13 +29,10 @@ class TaskOutput(BaseModel):
def result(self) -> Union[str, BaseModel, Dict[str, Any]]: def result(self) -> Union[str, BaseModel, Dict[str, Any]]:
"""Return the result of the task based on the available output.""" """Return the result of the task based on the available output."""
if self.pydantic_output: if self.pydantic_output:
print("returns pydantic_output", self.pydantic_output)
return self.pydantic_output return self.pydantic_output
elif self.json_output: elif self.json_output:
print("returns json_output", self.json_output)
return self.json_output return self.json_output
else: else:
print("return string out")
return self.raw_output return self.raw_output
def __getitem__(self, key: str) -> Any: def __getitem__(self, key: str) -> Any:

File diff suppressed because it is too large Load Diff

View File

@@ -3,13 +3,14 @@
import json import json
from concurrent.futures import Future from concurrent.futures import Future
from unittest import mock from unittest import mock
from unittest.mock import patch from unittest.mock import patch, MagicMock
import pydantic_core import pydantic_core
import pytest import pytest
from crewai.agent import Agent from crewai.agent import Agent
from crewai.agents.cache import CacheHandler from crewai.agents.cache import CacheHandler
from crewai.conditional_task import ConditionalTask
from crewai.crew import Crew from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
from crewai.memory.contextual.contextual_memory import ContextualMemory from crewai.memory.contextual.contextual_memory import ContextualMemory
@@ -559,7 +560,6 @@ def test_hierarchical_async_task_execution_completion():
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_single_task_with_async_execution(): def test_single_task_with_async_execution():
researcher_agent = Agent( researcher_agent = Agent(
role="Researcher", role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents", goal="Make the best research and analysis on content about AI and AI agents",
@@ -713,7 +713,6 @@ def test_async_task_execution_call_count():
) as mock_execute_sync, patch.object( ) as mock_execute_sync, patch.object(
Task, "execute_async", return_value=mock_future Task, "execute_async", return_value=mock_future
) as mock_execute_async: ) as mock_execute_async:
crew.kickoff() crew.kickoff()
assert mock_execute_async.call_count == 2 assert mock_execute_async.call_count == 2
@@ -1135,8 +1134,6 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_delegation_is_not_enabled_if_there_are_only_one_agent(): def test_delegation_is_not_enabled_if_there_are_only_one_agent():
from unittest.mock import patch
researcher = Agent( researcher = Agent(
role="Researcher", role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents", goal="Make the best research and analysis on content about AI and AI agents",
@@ -1204,6 +1201,82 @@ def test_agent_usage_metrics_are_captured_for_sequential_process():
assert crew.usage_metrics[key] > 0, f"Value for key '{key}' is zero" assert crew.usage_metrics[key] > 0, f"Value for key '{key}' is zero"
def test_conditional_task_requirement_breaks_when_singular_conditional_task():
task = ConditionalTask(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
)
with pytest.raises(pydantic_core._pydantic_core.ValidationError):
Crew(
agents=[researcher, writer],
tasks=[task],
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_should_not_execute():
task1 = Task(description="Return hello", expected_output="say hi", agent=researcher)
condition_mock = MagicMock(return_value=False)
task2 = ConditionalTask(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
condition=condition_mock,
agent=writer,
)
crew_met = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
)
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task 1 description",
raw_output="Task 1 output",
agent="Researcher",
)
result = crew_met.kickoff()
assert mock_execute_sync.call_count == 1
assert condition_mock.call_count == 1
assert condition_mock() is False
assert task2.output is None
assert result.raw_output().startswith("Task 1 output")
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_should_execute():
task1 = Task(description="Return hello", expected_output="say hi", agent=researcher)
condition_mock = MagicMock(
return_value=True
) # should execute this conditional task
task2 = ConditionalTask(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
condition=condition_mock,
agent=writer,
)
crew_met = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
)
with patch.object(Task, "execute_sync") as mock_execute_sync:
mock_execute_sync.return_value = TaskOutput(
description="Task 1 description",
raw_output="Task 1 output",
agent="Researcher",
)
crew_met.kickoff()
assert condition_mock.call_count == 1
assert condition_mock() is True
assert mock_execute_sync.call_count == 2
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_usage_metrics_are_captured_for_hierarchical_process(): def test_agent_usage_metrics_are_captured_for_hierarchical_process():
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI