|
|
|
|
@@ -23,9 +23,9 @@ Flows allow you to create structured, event-driven workflows. They provide a sea
|
|
|
|
|
Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
|
|
from crewai.flow.flow import Flow, listen, start
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
from litellm import completion
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -67,19 +67,19 @@ class ExampleFlow(Flow):
|
|
|
|
|
return fun_fact
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = ExampleFlow()
|
|
|
|
|
result = await flow.kickoff()
|
|
|
|
|
|
|
|
|
|
print(f"Generated fun fact: {result}")
|
|
|
|
|
flow = ExampleFlow()
|
|
|
|
|
result = flow.kickoff()
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
print(f"Generated fun fact: {result}")
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
|
|
|
|
|
|
|
|
|
|
When you run the Flow, it will generate a random city and then generate a fun fact about that city. The output will be printed to the console.
|
|
|
|
|
|
|
|
|
|
**Note:** Ensure you have set up your `.env` file to store your `OPENAI_API_KEY`. This key is necessary for authenticating requests to the OpenAI API.
|
|
|
|
|
|
|
|
|
|
### @start()
|
|
|
|
|
|
|
|
|
|
The `@start()` decorator is used to mark a method as the starting point of a Flow. When a Flow is started, all the methods decorated with `@start()` are executed in parallel. You can have multiple start methods in a Flow, and they will all be executed when the Flow is started.
|
|
|
|
|
@@ -119,7 +119,6 @@ Here's how you can access the final output:
|
|
|
|
|
|
|
|
|
|
<CodeGroup>
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
from crewai.flow.flow import Flow, listen, start
|
|
|
|
|
|
|
|
|
|
class OutputExampleFlow(Flow):
|
|
|
|
|
@@ -131,26 +130,24 @@ class OutputExampleFlow(Flow):
|
|
|
|
|
def second_method(self, first_output):
|
|
|
|
|
return f"Second method received: {first_output}"
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = OutputExampleFlow()
|
|
|
|
|
final_output = await flow.kickoff()
|
|
|
|
|
print("---- Final Output ----")
|
|
|
|
|
print(final_output)
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
```
|
|
|
|
|
flow = OutputExampleFlow()
|
|
|
|
|
final_output = flow.kickoff()
|
|
|
|
|
|
|
|
|
|
print("---- Final Output ----")
|
|
|
|
|
print(final_output)
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
``` text Output
|
|
|
|
|
---- Final Output ----
|
|
|
|
|
Second method received: Output from first_method
|
|
|
|
|
```
|
|
|
|
|
````
|
|
|
|
|
|
|
|
|
|
</CodeGroup>
|
|
|
|
|
|
|
|
|
|
In this example, the `second_method` is the last method to complete, so its output will be the final output of the Flow.
|
|
|
|
|
The `kickoff()` method will return the final output, which is then printed to the console.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### Accessing and Updating State
|
|
|
|
|
|
|
|
|
|
In addition to retrieving the final output, you can also access and update the state within your Flow. The state can be used to store and share data between different methods in the Flow. After the Flow has run, you can access the state to retrieve any information that was added or updated during the execution.
|
|
|
|
|
@@ -160,7 +157,6 @@ Here's an example of how to update and access the state:
|
|
|
|
|
<CodeGroup>
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
from crewai.flow.flow import Flow, listen, start
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
|
|
|
|
|
@@ -181,21 +177,19 @@ class StateExampleFlow(Flow[ExampleState]):
|
|
|
|
|
self.state.counter += 1
|
|
|
|
|
return self.state.message
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = StateExampleFlow()
|
|
|
|
|
final_output = await flow.kickoff()
|
|
|
|
|
print(f"Final Output: {final_output}")
|
|
|
|
|
print("Final State:")
|
|
|
|
|
print(flow.state)
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
flow = StateExampleFlow()
|
|
|
|
|
final_output = flow.kickoff()
|
|
|
|
|
print(f"Final Output: {final_output}")
|
|
|
|
|
print("Final State:")
|
|
|
|
|
print(flow.state)
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
``` text Output
|
|
|
|
|
```text Output
|
|
|
|
|
Final Output: Hello from first_method - updated by second_method
|
|
|
|
|
Final State:
|
|
|
|
|
counter=2 message='Hello from first_method - updated by second_method'
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
</CodeGroup>
|
|
|
|
|
|
|
|
|
|
In this example, the state is updated by both `first_method` and `second_method`.
|
|
|
|
|
@@ -215,8 +209,6 @@ In unstructured state management, all state is stored in the `state` attribute o
|
|
|
|
|
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
|
|
from crewai.flow.flow import Flow, listen, start
|
|
|
|
|
|
|
|
|
|
class UntructuredExampleFlow(Flow):
|
|
|
|
|
@@ -239,12 +231,8 @@ class UntructuredExampleFlow(Flow):
|
|
|
|
|
print(f"State after third_method: {self.state}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = UntructuredExampleFlow()
|
|
|
|
|
await flow.kickoff()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
flow = UntructuredExampleFlow()
|
|
|
|
|
flow.kickoff()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
**Key Points:**
|
|
|
|
|
@@ -258,8 +246,6 @@ Structured state management leverages predefined schemas to ensure consistency a
|
|
|
|
|
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
|
|
from crewai.flow.flow import Flow, listen, start
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
|
|
|
|
|
@@ -288,12 +274,8 @@ class StructuredExampleFlow(Flow[ExampleState]):
|
|
|
|
|
print(f"State after third_method: {self.state}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = StructuredExampleFlow()
|
|
|
|
|
await flow.kickoff()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
flow = StructuredExampleFlow()
|
|
|
|
|
flow.kickoff()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
**Key Points:**
|
|
|
|
|
@@ -326,7 +308,6 @@ The `or_` function in Flows allows you to listen to multiple methods and trigger
|
|
|
|
|
<CodeGroup>
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
from crewai.flow.flow import Flow, listen, or_, start
|
|
|
|
|
|
|
|
|
|
class OrExampleFlow(Flow):
|
|
|
|
|
@@ -344,15 +325,12 @@ class OrExampleFlow(Flow):
|
|
|
|
|
print(f"Logger: {result}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = OrExampleFlow()
|
|
|
|
|
await flow.kickoff()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
flow = OrExampleFlow()
|
|
|
|
|
flow.kickoff()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
``` text Output
|
|
|
|
|
```text Output
|
|
|
|
|
Logger: Hello from the start method
|
|
|
|
|
Logger: Hello from the second method
|
|
|
|
|
```
|
|
|
|
|
@@ -369,7 +347,6 @@ The `and_` function in Flows allows you to listen to multiple methods and trigge
|
|
|
|
|
<CodeGroup>
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
from crewai.flow.flow import Flow, and_, listen, start
|
|
|
|
|
|
|
|
|
|
class AndExampleFlow(Flow):
|
|
|
|
|
@@ -387,16 +364,11 @@ class AndExampleFlow(Flow):
|
|
|
|
|
print("---- Logger ----")
|
|
|
|
|
print(self.state)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = AndExampleFlow()
|
|
|
|
|
await flow.kickoff()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
flow = AndExampleFlow()
|
|
|
|
|
flow.kickoff()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
``` text Output
|
|
|
|
|
```text Output
|
|
|
|
|
---- Logger ----
|
|
|
|
|
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
|
|
|
|
|
```
|
|
|
|
|
@@ -414,7 +386,6 @@ You can specify different routes based on the output of the method, allowing you
|
|
|
|
|
<CodeGroup>
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
import asyncio
|
|
|
|
|
import random
|
|
|
|
|
from crewai.flow.flow import Flow, listen, router, start
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
@@ -446,15 +417,11 @@ class RouterFlow(Flow[ExampleState]):
|
|
|
|
|
print("Fourth method running")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
|
|
flow = RouterFlow()
|
|
|
|
|
await flow.kickoff()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
asyncio.run(main())
|
|
|
|
|
flow = RouterFlow()
|
|
|
|
|
flow.kickoff()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
``` text Output
|
|
|
|
|
```text Output
|
|
|
|
|
Starting the structured flow
|
|
|
|
|
Third method running
|
|
|
|
|
Fourth method running
|
|
|
|
|
@@ -486,10 +453,10 @@ This command will generate a new CrewAI project with the necessary folder struct
|
|
|
|
|
After running the `crewai create flow name_of_flow` command, you will see a folder structure similar to the following:
|
|
|
|
|
|
|
|
|
|
| Directory/File | Description |
|
|
|
|
|
|:---------------------------------|:------------------------------------------------------------------|
|
|
|
|
|
| :--------------------- | :----------------------------------------------------------------- |
|
|
|
|
|
| `name_of_flow/` | Root directory for the flow. |
|
|
|
|
|
| ├── `crews/` | Contains directories for specific crews. |
|
|
|
|
|
| │ └── `poem_crew/` | Directory for the "poem_crew" with its configurations and scripts.|
|
|
|
|
|
| │ └── `poem_crew/` | Directory for the "poem_crew" with its configurations and scripts. |
|
|
|
|
|
| │ ├── `config/` | Configuration files directory for the "poem_crew". |
|
|
|
|
|
| │ │ ├── `agents.yaml` | YAML file defining the agents for "poem_crew". |
|
|
|
|
|
| │ │ └── `tasks.yaml` | YAML file defining the tasks for "poem_crew". |
|
|
|
|
|
@@ -501,7 +468,6 @@ After running the `crewai create flow name_of_flow` command, you will see a fold
|
|
|
|
|
| ├── `pyproject.toml` | Configuration file for project dependencies and settings. |
|
|
|
|
|
| └── `.gitignore` | Specifies files and directories to ignore in version control. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### Building Your Crews
|
|
|
|
|
|
|
|
|
|
In the `crews` folder, you can define multiple crews. Each crew will have its own folder containing configuration files and the crew definition file. For example, the `poem_crew` folder contains:
|
|
|
|
|
@@ -520,7 +486,6 @@ Here's an example of how you can connect the `poem_crew` in the `main.py` file:
|
|
|
|
|
|
|
|
|
|
```python Code
|
|
|
|
|
#!/usr/bin/env python
|
|
|
|
|
import asyncio
|
|
|
|
|
from random import randint
|
|
|
|
|
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
@@ -536,14 +501,12 @@ class PoemFlow(Flow[PoemState]):
|
|
|
|
|
@start()
|
|
|
|
|
def generate_sentence_count(self):
|
|
|
|
|
print("Generating sentence count")
|
|
|
|
|
# Generate a number between 1 and 5
|
|
|
|
|
self.state.sentence_count = randint(1, 5)
|
|
|
|
|
|
|
|
|
|
@listen(generate_sentence_count)
|
|
|
|
|
def generate_poem(self):
|
|
|
|
|
print("Generating poem")
|
|
|
|
|
poem_crew = PoemCrew().crew()
|
|
|
|
|
result = poem_crew.kickoff(inputs={"sentence_count": self.state.sentence_count})
|
|
|
|
|
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
|
|
|
|
|
|
|
|
|
|
print("Poem generated", result.raw)
|
|
|
|
|
self.state.poem = result.raw
|
|
|
|
|
@@ -554,18 +517,17 @@ class PoemFlow(Flow[PoemState]):
|
|
|
|
|
with open("poem.txt", "w") as f:
|
|
|
|
|
f.write(self.state.poem)
|
|
|
|
|
|
|
|
|
|
async def run():
|
|
|
|
|
"""
|
|
|
|
|
Run the flow.
|
|
|
|
|
"""
|
|
|
|
|
def kickoff():
|
|
|
|
|
poem_flow = PoemFlow()
|
|
|
|
|
await poem_flow.kickoff()
|
|
|
|
|
poem_flow.kickoff()
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
|
|
|
|
def plot():
|
|
|
|
|
poem_flow = PoemFlow()
|
|
|
|
|
poem_flow.plot()
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|
|
|
|
|
kickoff()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
In this example, the `PoemFlow` class defines a flow that generates a sentence count, uses the `PoemCrew` to generate a poem, and then saves the poem to a file. The flow is kicked off by calling the `kickoff()` method.
|
|
|
|
|
@@ -587,13 +549,13 @@ source .venv/bin/activate
|
|
|
|
|
After activating the virtual environment, you can run the flow by executing one of the following commands:
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
crewai flow run
|
|
|
|
|
crewai flow kickoff
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
or
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
uv run run_flow
|
|
|
|
|
uv run kickoff
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
The flow will execute, and you should see the output in the console.
|
|
|
|
|
|