mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
* docs(cli): document device-code login and config reset guidance; renumber sections * docs(cli): fix duplicate numbering (renumber Login/API Keys/Configuration sections) * docs: Fix webhook documentation to include meta dict in all webhook payloads - Add note explaining that meta objects from kickoff requests are included in all webhook payloads - Update webhook examples to show proper payload structure including meta field - Fix webhook examples to match actual API implementation - Apply changes to English, Korean, and Portuguese documentation Resolves the documentation gap where meta dict passing to webhooks was not documented despite being implemented in the API. * WIP: CrewAI docs theme, changelog, GEO, localization * docs(cli): fix merge markers; ensure mode: "wide"; convert ASCII tables to Markdown (en/pt-BR/ko) * docs: add group icons across locales; split Automation/Integrations; update tools overviews and links
90 lines
2.6 KiB
Plaintext
90 lines
2.6 KiB
Plaintext
---
|
|
title: Conditional Tasks
|
|
description: Learn how to use conditional tasks in a crewAI kickoff
|
|
icon: diagram-subtask
|
|
mode: "wide"
|
|
---
|
|
|
|
## Introduction
|
|
|
|
Conditional Tasks in crewAI allow for dynamic workflow adaptation based on the outcomes of previous tasks.
|
|
This powerful feature enables crews to make decisions and execute tasks selectively, enhancing the flexibility and efficiency of your AI-driven processes.
|
|
|
|
## Example Usage
|
|
|
|
```python Code
|
|
from typing import List
|
|
from pydantic import BaseModel
|
|
from crewai import Agent, Crew
|
|
from crewai.tasks.conditional_task import ConditionalTask
|
|
from crewai.tasks.task_output import TaskOutput
|
|
from crewai.task import Task
|
|
from crewai_tools import SerperDevTool
|
|
|
|
# Define a condition function for the conditional task
|
|
# If false, the task will be skipped, if true, then execute the task.
|
|
def is_data_missing(output: TaskOutput) -> bool:
|
|
return len(output.pydantic.events) < 10 # this will skip this task
|
|
|
|
# Define the agents
|
|
data_fetcher_agent = Agent(
|
|
role="Data Fetcher",
|
|
goal="Fetch data online using Serper tool",
|
|
backstory="Backstory 1",
|
|
verbose=True,
|
|
tools=[SerperDevTool()]
|
|
)
|
|
|
|
data_processor_agent = Agent(
|
|
role="Data Processor",
|
|
goal="Process fetched data",
|
|
backstory="Backstory 2",
|
|
verbose=True
|
|
)
|
|
|
|
summary_generator_agent = Agent(
|
|
role="Summary Generator",
|
|
goal="Generate summary from fetched data",
|
|
backstory="Backstory 3",
|
|
verbose=True
|
|
)
|
|
|
|
class EventOutput(BaseModel):
|
|
events: List[str]
|
|
|
|
task1 = Task(
|
|
description="Fetch data about events in San Francisco using Serper tool",
|
|
expected_output="List of 10 things to do in SF this week",
|
|
agent=data_fetcher_agent,
|
|
output_pydantic=EventOutput,
|
|
)
|
|
|
|
conditional_task = ConditionalTask(
|
|
description="""
|
|
Check if data is missing. If we have less than 10 events,
|
|
fetch more events using Serper tool so that
|
|
we have a total of 10 events in SF this week..
|
|
""",
|
|
expected_output="List of 10 Things to do in SF this week",
|
|
condition=is_data_missing,
|
|
agent=data_processor_agent,
|
|
)
|
|
|
|
task3 = Task(
|
|
description="Generate summary of events in San Francisco from fetched data",
|
|
expected_output="A complete report on the customer and their customers and competitors, including their demographics, preferences, market positioning and audience engagement.",
|
|
agent=summary_generator_agent,
|
|
)
|
|
|
|
# Create a crew with the tasks
|
|
crew = Crew(
|
|
agents=[data_fetcher_agent, data_processor_agent, summary_generator_agent],
|
|
tasks=[task1, conditional_task, task3],
|
|
verbose=True,
|
|
planning=True
|
|
)
|
|
|
|
# Run the crew
|
|
result = crew.kickoff()
|
|
print("results", result)
|
|
``` |