Add more tests. Clean up docs. Improve conditional task

This commit is contained in:
Brandon Hancock
2024-07-17 11:03:11 -04:00
parent cb720143c7
commit 36efa172ee
6 changed files with 3682 additions and 33 deletions

View File

@@ -4,8 +4,8 @@ description: Learn how to use conditional tasks in a crewAI kickoff
---
## Introduction
Conditional Tasks in crewAI allow for dynamic workflow adaptation based on the outcomes of previous tasks. This powerful feature enables crews to make decisions and execute tasks selectively, enhancing the flexibility and efficiency of your AI-driven processes.
Conditional Tasks in crewAI allow for dynamic workflow adaptation based on the outcomes of previous tasks. This powerful feature enables crews to make decisions and execute tasks selectively, enhancing the flexibility and efficiency of your AI-driven processes.
```python
from typing import List
@@ -18,12 +18,10 @@ from crewai.task import Task
from crewai_tools import SerperDevTool
# Define a condition function for the conditional task - if false task will be skipped, true, then execute task
def is_data_fetched(output: TaskOutput) -> bool:
if len(output.pydantic.events) >= 10: # this will skip this task
return False
return True
# Define a condition function for the conditional task
# if false task will be skipped, true, then execute task
def is_data_missing(output: TaskOutput) -> bool:
return len(output.pydantic.events) < 10: # this will skip this task
# Define the agents
data_fetcher_agent = Agent(
@@ -54,7 +52,6 @@ class EventOutput(BaseModel):
task1 = Task(
name="Data Fetching Task",
description="Fetch data about events in San Francisco using Serper tool",
expected_output="List of 10 things to do in SF this week",
agent=data_fetcher_agent,
@@ -62,15 +59,17 @@ task1 = Task(
)
conditional_task = ConditionalTask(
name="Data Processing Task",
description="Process data if data fetching is successful",
expected_output="List of 11 Things to do in SF this week ",
condition=is_data_fetched,
description="""
Check if data is missing. If we have less than 10 events,
fetch more events using Serper tool so that
we have a total of 10 events in SF this week..
""",
expected_output="List of 10 Things to do in SF this week ",
condition=is_data_missing,
agent=data_processor_agent,
)
task3 = Task(
name="Summary Generation Task",
description="Generate summary of events in San Francisco from fetched data",
expected_output="summary_generated",
agent=summary_generator_agent,
@@ -85,4 +84,4 @@ crew = Crew(
result = crew.kickoff()
print("results", result)
```
```

View File

@@ -39,12 +39,11 @@ from crewai.utilities.constants import (
TRAINING_DATA_FILE,
)
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
)
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
@@ -310,7 +309,7 @@ class Crew(BaseModel):
@model_validator(mode="after")
def validate_async_tasks_not_async(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
"""Ensure that ConditionalTask is not async."""
for task in self.tasks:
if task.async_execution and isinstance(task, ConditionalTask):
raise PydanticCustomError(
@@ -704,12 +703,7 @@ class Crew(BaseModel):
f"Skipping conditional task: {task.description}",
color="yellow",
)
skipped_task_output = TaskOutput(
description=task.description,
raw="",
agent=task.agent.role if task.agent else "",
output_format=OutputFormat.RAW,
)
skipped_task_output = task.get_skipped_task_output()
if not was_replayed:
self._store_execution_log(task, skipped_task_output, task_index)

View File

@@ -1,7 +1,9 @@
from typing import Callable, Any
from typing import Any, Callable
from pydantic import Field
from crewai.task import Task
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
@@ -37,3 +39,11 @@ class ConditionalTask(Task):
if self.condition:
return self.condition(context)
return True
def get_skipped_task_output(self):
return TaskOutput(
description=self.description,
raw="",
agent=self.agent.role if self.agent else "",
output_format=OutputFormat.RAW,
)

View File

@@ -0,0 +1,151 @@
interactions:
- request:
body: '{"messages": [{"content": "You are Researcher. You''re an expert researcher,
specialized in technology, software engineering, AI and startups. You work as
a freelancer and is now working on doing research and analysis for a new customer.\nYour
personal goal is: Make the best research and analysis on content about AI and
AI agentsTo give my best complete final answer to the task use the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: my best complete
final answer to the task.\nYour final answer must be the great and the most
complete as possible, it must be outcome described.\n\nI MUST use these formats,
my job depends on it!\nCurrent Task: Say Hi\n\nThis is the expect criteria for
your final answer: Hi \n you MUST return the actual complete content as the
final answer, not a summary.\n\nBegin! This is VERY important to you, use the
tools available and give your best Final Answer, your job depends on it!\n\nThought:\n",
"role": "user"}], "model": "gpt-4o", "n": 1, "stop": ["\nObservation"], "stream":
true, "temperature": 0.7}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '1072'
content-type:
- application/json
cookie:
- _cfuvid=Nxp5RE8CN2EyHxMIvB_SaTizIH5w0eWt9SPilMuIjMk-1721227661802-0.0.1.1-604800000;
__cf_bm=jadAYV2gh7qPDzgKO9A4JzJTaI9c2fnnjxloIQZeOIw-1721227661-1.0.1.1-apaA8kQyGiEV3kOuXHe8z1zeyvxd_jBHCQpdqWirUlylrUo.uRZjRDueI.sSXS4hXoWkyIW6kIMt7lamQM2mdw
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.35.10
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.35.10
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"I"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
now"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
can"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
give"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
a"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
great"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
answer"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":".\n"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"Final"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
Answer"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":":"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{"content":"
Hi"},"logprobs":null,"finish_reason":null}]}
data: {"id":"chatcmpl-9m0FB4Oy6X0apYX2wcQ30rTBkmtxT","object":"chat.completion.chunk","created":1721227709,"model":"gpt-4o-2024-05-13","system_fingerprint":"fp_c4e5b6fa31","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}
data: [DONE]
'
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 8a4b087e7d024593-ATL
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Wed, 17 Jul 2024 14:48:29 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '142'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=15552000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '30000000'
x-ratelimit-remaining-requests:
- '9999'
x-ratelimit-remaining-tokens:
- '29999753'
x-ratelimit-reset-requests:
- 6ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_75100793afc289eaf8b56127e1cc0532
status:
code: 200
message: OK
version: 1

File diff suppressed because it is too large Load Diff

View File

@@ -8,15 +8,14 @@ from unittest.mock import MagicMock, patch
import pydantic_core
import pytest
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.tasks.conditional_task import ConditionalTask
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import Logger, RPMController
@@ -918,9 +917,7 @@ async def test_kickoff_async_basic_functionality_and_output():
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.asyncio
async def test_async_kickoff_for_each_async_basic_functionality_and_output():
"""Tests the basic functionality and output of akickoff_for_each_async."""
from unittest.mock import patch
"""Tests the basic functionality and output of kickoff_for_each_async."""
inputs = [
{"topic": "dog"},
{"topic": "cat"},
@@ -946,8 +943,13 @@ async def test_async_kickoff_for_each_async_basic_functionality_and_output():
agent=agent,
)
async def mock_kickoff_async(**kwargs):
input_data = kwargs.get("inputs")
index = [input_["topic"] for input_ in inputs].index(input_data["topic"])
return expected_outputs[index]
with patch.object(
Crew, "kickoff_async", side_effect=expected_outputs
Crew, "kickoff_async", side_effect=mock_kickoff_async
) as mock_kickoff_async:
crew = Crew(agents=[agent], tasks=[task])
@@ -2281,10 +2283,8 @@ def test_conditional_task_requirement_breaks_when_singular_conditional_task():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_last_task():
def test_conditional_task_last_task_when_conditional_is_true():
def condition_fn(output) -> bool:
if output.raw == "Hi":
return False
return True
task1 = Task(
@@ -2304,6 +2304,34 @@ def test_conditional_task_last_task():
tasks=[task1, task2],
)
result = crew.kickoff()
assert result.raw.startswith(
"1. **The Rise of AI Agents in Customer Service: Revolutionizing Customer Interactions**"
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_last_task_when_conditional_is_false():
def condition_fn(output) -> bool:
return False
task1 = Task(
description="Say Hi",
expected_output="Hi",
agent=researcher,
)
task2 = ConditionalTask(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
condition=condition_fn,
agent=writer,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
)
result = crew.kickoff()
print(result.raw)
assert result.raw == "Hi"