Compare commits

...

6 Commits

Author SHA1 Message Date
Brandon Hancock
eb13dbb223 Clean up docs 2024-10-21 17:22:48 -04:00
Brandon Hancock
056a8e734b make flow kickoff sync 2024-10-21 17:09:59 -04:00
Brandon Hancock
70fdfa1d8d Template fix 2024-10-21 15:25:37 -04:00
Brandon Hancock
db0f2d4697 Update docs and scripts 2024-10-21 11:44:41 -04:00
Brandon Hancock
ec70e76ab6 propogate changes 2024-10-21 11:10:10 -04:00
Brandon Hancock
698cac066f simplify flow 2024-10-21 10:51:59 -04:00
6 changed files with 111 additions and 160 deletions

View File

@@ -23,9 +23,9 @@ Flows allow you to create structured, event-driven workflows. They provide a sea
Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
@@ -67,19 +67,19 @@ class ExampleFlow(Flow):
return fun_fact
async def main():
flow = ExampleFlow()
result = await flow.kickoff()
print(f"Generated fun fact: {result}")
flow = ExampleFlow()
result = flow.kickoff()
asyncio.run(main())
print(f"Generated fun fact: {result}")
```
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
When you run the Flow, it will generate a random city and then generate a fun fact about that city. The output will be printed to the console.
**Note:** Ensure you have set up your `.env` file to store your `OPENAI_API_KEY`. This key is necessary for authenticating requests to the OpenAI API.
### @start()
The `@start()` decorator is used to mark a method as the starting point of a Flow. When a Flow is started, all the methods decorated with `@start()` are executed in parallel. You can have multiple start methods in a Flow, and they will all be executed when the Flow is started.
@@ -119,7 +119,6 @@ Here's how you can access the final output:
<CodeGroup>
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@@ -131,26 +130,24 @@ class OutputExampleFlow(Flow):
def second_method(self, first_output):
return f"Second method received: {first_output}"
async def main():
flow = OutputExampleFlow()
final_output = await flow.kickoff()
print("---- Final Output ----")
print(final_output)
asyncio.run(main())
```
flow = OutputExampleFlow()
final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
````
``` text Output
---- Final Output ----
Second method received: Output from first_method
```
````
</CodeGroup>
In this example, the `second_method` is the last method to complete, so its output will be the final output of the Flow.
In this example, the `second_method` is the last method to complete, so its output will be the final output of the Flow.
The `kickoff()` method will return the final output, which is then printed to the console.
#### Accessing and Updating State
In addition to retrieving the final output, you can also access and update the state within your Flow. The state can be used to store and share data between different methods in the Flow. After the Flow has run, you can access the state to retrieve any information that was added or updated during the execution.
@@ -160,7 +157,6 @@ Here's an example of how to update and access the state:
<CodeGroup>
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
@@ -181,42 +177,38 @@ class StateExampleFlow(Flow[ExampleState]):
self.state.counter += 1
return self.state.message
async def main():
flow = StateExampleFlow()
final_output = await flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
asyncio.run(main())
flow = StateExampleFlow()
final_output = flow.kickoff()
print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)
```
``` text Output
```text Output
Final Output: Hello from first_method - updated by second_method
Final State:
counter=2 message='Hello from first_method - updated by second_method'
```
</CodeGroup>
In this example, the state is updated by both `first_method` and `second_method`.
In this example, the state is updated by both `first_method` and `second_method`.
After the Flow has run, you can access the final state to see the updates made by these methods.
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems,
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems,
while also maintaining and accessing the state throughout the Flow's execution.
## Flow State Management
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management,
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management,
allowing developers to choose the approach that best fits their application's needs.
### Unstructured State Management
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
class UntructuredExampleFlow(Flow):
@@ -239,12 +231,8 @@ class UntructuredExampleFlow(Flow):
print(f"State after third_method: {self.state}")
async def main():
flow = UntructuredExampleFlow()
await flow.kickoff()
asyncio.run(main())
flow = UntructuredExampleFlow()
flow.kickoff()
```
**Key Points:**
@@ -254,12 +242,10 @@ asyncio.run(main())
### Structured State Management
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
@@ -288,12 +274,8 @@ class StructuredExampleFlow(Flow[ExampleState]):
print(f"State after third_method: {self.state}")
async def main():
flow = StructuredExampleFlow()
await flow.kickoff()
asyncio.run(main())
flow = StructuredExampleFlow()
flow.kickoff()
```
**Key Points:**
@@ -326,7 +308,6 @@ The `or_` function in Flows allows you to listen to multiple methods and trigger
<CodeGroup>
```python Code
import asyncio
from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow):
@@ -344,22 +325,19 @@ class OrExampleFlow(Flow):
print(f"Logger: {result}")
async def main():
flow = OrExampleFlow()
await flow.kickoff()
asyncio.run(main())
flow = OrExampleFlow()
flow.kickoff()
```
``` text Output
```text Output
Logger: Hello from the start method
Logger: Hello from the second method
```
</CodeGroup>
When you run this Flow, the `logger` method will be triggered by the output of either the `start_method` or the `second_method`.
When you run this Flow, the `logger` method will be triggered by the output of either the `start_method` or the `second_method`.
The `or_` function is used to listen to multiple methods and trigger the listener method when any of the specified methods emit an output.
### Conditional Logic: `and`
@@ -369,7 +347,6 @@ The `and_` function in Flows allows you to listen to multiple methods and trigge
<CodeGroup>
```python Code
import asyncio
from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow):
@@ -387,34 +364,28 @@ class AndExampleFlow(Flow):
print("---- Logger ----")
print(self.state)
async def main():
flow = AndExampleFlow()
await flow.kickoff()
asyncio.run(main())
flow = AndExampleFlow()
flow.kickoff()
```
``` text Output
```text Output
---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
```
</CodeGroup>
When you run this Flow, the `logger` method will be triggered only when both the `start_method` and the `second_method` emit an output.
When you run this Flow, the `logger` method will be triggered only when both the `start_method` and the `second_method` emit an output.
The `and_` function is used to listen to multiple methods and trigger the listener method only when all the specified methods emit an output.
### Router
The `@router()` decorator in Flows allows you to define conditional routing logic based on the output of a method.
The `@router()` decorator in Flows allows you to define conditional routing logic based on the output of a method.
You can specify different routes based on the output of the method, allowing you to control the flow of execution dynamically.
<CodeGroup>
```python Code
import asyncio
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
@@ -446,15 +417,11 @@ class RouterFlow(Flow[ExampleState]):
print("Fourth method running")
async def main():
flow = RouterFlow()
await flow.kickoff()
asyncio.run(main())
flow = RouterFlow()
flow.kickoff()
```
``` text Output
```text Output
Starting the structured flow
Third method running
Fourth method running
@@ -462,16 +429,16 @@ Fourth method running
</CodeGroup>
In the above example, the `start_method` generates a random boolean value and sets it in the state.
The `second_method` uses the `@router()` decorator to define conditional routing logic based on the value of the boolean.
If the boolean is `True`, the method returns `"success"`, and if it is `False`, the method returns `"failed"`.
In the above example, the `start_method` generates a random boolean value and sets it in the state.
The `second_method` uses the `@router()` decorator to define conditional routing logic based on the value of the boolean.
If the boolean is `True`, the method returns `"success"`, and if it is `False`, the method returns `"failed"`.
The `third_method` and `fourth_method` listen to the output of the `second_method` and execute based on the returned value.
When you run this Flow, the output will change based on the random boolean value generated by the `start_method`.
## Adding Crews to Flows
Creating a flow with multiple crews in CrewAI is straightforward.
Creating a flow with multiple crews in CrewAI is straightforward.
You can generate a new CrewAI project that includes all the scaffolding needed to create a flow with multiple crews by running the following command:
@@ -485,22 +452,21 @@ This command will generate a new CrewAI project with the necessary folder struct
After running the `crewai create flow name_of_flow` command, you will see a folder structure similar to the following:
| Directory/File | Description |
|:---------------------------------|:------------------------------------------------------------------|
| `name_of_flow/` | Root directory for the flow. |
| ├── `crews/` | Contains directories for specific crews. |
| │ └── `poem_crew/` | Directory for the "poem_crew" with its configurations and scripts.|
| │ ├── `config/` | Configuration files directory for the "poem_crew". |
| │ ├── `agents.yaml` | YAML file defining the agents for "poem_crew". |
| │ └── `tasks.yaml` | YAML file defining the tasks for "poem_crew". |
| │ ├── `poem_crew.py` | Script for "poem_crew" functionality. |
| ├── `tools/` | Directory for additional tools used in the flow. |
| │ └── `custom_tool.py` | Custom tool implementation. |
| ├── `main.py` | Main script for running the flow. |
| ├── `README.md` | Project description and instructions. |
| ├── `pyproject.toml` | Configuration file for project dependencies and settings. |
| └── `.gitignore` | Specifies files and directories to ignore in version control. |
| Directory/File | Description |
| :--------------------- | :----------------------------------------------------------------- |
| `name_of_flow/` | Root directory for the flow. |
| ├── `crews/` | Contains directories for specific crews. |
| │ └── `poem_crew/` | Directory for the "poem_crew" with its configurations and scripts. |
| │ ├── `config/` | Configuration files directory for the "poem_crew". |
| │ ├── `agents.yaml` | YAML file defining the agents for "poem_crew". |
| │ └── `tasks.yaml` | YAML file defining the tasks for "poem_crew". |
| │ ├── `poem_crew.py` | Script for "poem_crew" functionality. |
| ├── `tools/` | Directory for additional tools used in the flow. |
| │ └── `custom_tool.py` | Custom tool implementation. |
| ├── `main.py` | Main script for running the flow. |
| ├── `README.md` | Project description and instructions. |
| ├── `pyproject.toml` | Configuration file for project dependencies and settings. |
| └── `.gitignore` | Specifies files and directories to ignore in version control. |
### Building Your Crews
@@ -520,7 +486,6 @@ Here's an example of how you can connect the `poem_crew` in the `main.py` file:
```python Code
#!/usr/bin/env python
import asyncio
from random import randint
from pydantic import BaseModel
@@ -536,14 +501,12 @@ class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
# Generate a number between 1 and 5
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
poem_crew = PoemCrew().crew()
result = poem_crew.kickoff(inputs={"sentence_count": self.state.sentence_count})
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
print("Poem generated", result.raw)
self.state.poem = result.raw
@@ -554,18 +517,17 @@ class PoemFlow(Flow[PoemState]):
with open("poem.txt", "w") as f:
f.write(self.state.poem)
async def run():
"""
Run the flow.
"""
def kickoff():
poem_flow = PoemFlow()
await poem_flow.kickoff()
poem_flow.kickoff()
def main():
asyncio.run(run())
def plot():
poem_flow = PoemFlow()
poem_flow.plot()
if __name__ == "__main__":
main()
kickoff()
```
In this example, the `PoemFlow` class defines a flow that generates a sentence count, uses the `PoemCrew` to generate a poem, and then saves the poem to a file. The flow is kicked off by calling the `kickoff()` method.
@@ -587,13 +549,13 @@ source .venv/bin/activate
After activating the virtual environment, you can run the flow by executing one of the following commands:
```bash
crewai flow run
crewai flow kickoff
```
or
```bash
uv run run_flow
uv run kickoff
```
The flow will execute, and you should see the output in the console.
@@ -657,13 +619,13 @@ By exploring these examples, you can gain insights into how to leverage CrewAI F
Also, check out our YouTube video on how to use flows in CrewAI below!
<iframe
width="560"
height="315"
src="https://www.youtube.com/embed/MTb5my6VOT8"
title="YouTube video player"
frameborder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
<iframe
width="560"
height="315"
src="https://www.youtube.com/embed/MTb5my6VOT8"
title="YouTube video player"
frameborder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>

View File

@@ -14,11 +14,11 @@ from .authentication.main import AuthenticationCommand
from .deploy.main import DeployCommand
from .evaluate_crew import evaluate_crew
from .install_crew import install_crew
from .kickoff_flow import kickoff_flow
from .plot_flow import plot_flow
from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
from .run_crew import run_crew
from .run_flow import run_flow
from .tools.main import ToolCommand
from .train_crew import train_crew
from .update_crew import update_crew
@@ -304,11 +304,11 @@ def flow():
pass
@flow.command(name="run")
@flow.command(name="kickoff")
def flow_run():
"""Run the Flow."""
"""Kickoff the Flow."""
click.echo("Running the Flow")
run_flow()
kickoff_flow()
@flow.command(name="plot")

View File

@@ -3,11 +3,11 @@ import subprocess
import click
def run_flow() -> None:
def kickoff_flow() -> None:
"""
Run the flow by running a command in the UV environment.
Kickoff the flow by running a command in the UV environment.
"""
command = ["uv", "run", "run_flow"]
command = ["uv", "run", "kickoff"]
try:
result = subprocess.run(command, capture_output=False, text=True, check=True)

View File

@@ -1,65 +1,53 @@
#!/usr/bin/env python
import asyncio
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
# Generate a number between 1 and 5
self.state.sentence_count = randint(1, 5)
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
print(f"State before poem: {self.state}")
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
result = (
PoemCrew()
.crew()
.kickoff(inputs={"sentence_count": self.state.sentence_count})
)
print("Poem generated", result.raw)
self.state.poem = result.raw
print(f"State after generate_poem: {self.state}")
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
print(f"State before save_poem: {self.state}")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
print(f"State after save_poem: {self.state}")
async def run_flow():
"""
Run the flow.
"""
def kickoff():
poem_flow = PoemFlow()
await poem_flow.kickoff()
poem_flow.kickoff()
async def plot_flow():
"""
Plot the flow.
"""
def plot():
poem_flow = PoemFlow()
poem_flow.plot()
def main():
asyncio.run(run_flow())
def plot():
asyncio.run(plot_flow())
if __name__ == "__main__":
main()
kickoff()

View File

@@ -6,13 +6,11 @@ authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<=3.13"
dependencies = [
"crewai[tools]>=0.74.2,<1.0.0",
"asyncio"
]
[project.scripts]
{{folder_name}} = "{{folder_name}}.main:main"
run_flow = "{{folder_name}}.main:main"
plot_flow = "{{folder_name}}.main:plot"
kickoff = "{{folder_name}}.main:kickoff"
plot = "{{folder_name}}.main:plot"
[build-system]
requires = ["hatchling"]

View File

@@ -190,7 +190,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
async def kickoff(self) -> Any:
def kickoff(self) -> Any:
return asyncio.run(self.kickoff_async())
async def kickoff_async(self) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")