mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
Working on docs.
This commit is contained in:
@@ -14,4 +14,203 @@ Flows allow you to create structured, event-driven workflows. They provide a sea
|
||||
|
||||
4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.
|
||||
|
||||
##
|
||||
## Getting Started
|
||||
|
||||
Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from openai import OpenAI
|
||||
|
||||
|
||||
class ExampleFlow(Flow):
|
||||
client = OpenAI()
|
||||
model = "gpt-4o-mini"
|
||||
|
||||
@start()
|
||||
def generate_city(self):
|
||||
print("Starting flow")
|
||||
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Return the name of a random city in the world.",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
random_city = response.choices[0].message.content
|
||||
print("---- Random City ----")
|
||||
print(random_city)
|
||||
return random_city
|
||||
|
||||
@listen(generate_city)
|
||||
def generate_fun_fact(self, random_city):
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": f"Tell me a fun fact about {random_city}",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
fun_fact = response.choices[0].message.content
|
||||
print("---- Fun Fact ----")
|
||||
print(fun_fact)
|
||||
|
||||
|
||||
async def main():
|
||||
flow = ExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
```
|
||||
|
||||
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
|
||||
|
||||
When you run the Flow, it will generate a random city and then generate a fun fact about that city. The output will be printed to the console.
|
||||
|
||||
### @start()
|
||||
|
||||
The `@start()` decorator is used to mark a method as the starting point of a Flow. When a Flow is started, all the methods decorated with `@start()` are executed in parallel. You can have multiple start methods in a Flow, and they will all be executed when the Flow is started.
|
||||
|
||||
### @listen()
|
||||
|
||||
The `@listen()` decorator is used to mark a method as a listener for the output of another task in the Flow. The method decorated with `@listen()` will be executed when the specified task emits an output. The method can access the output of the task it is listening to as an argument.
|
||||
|
||||
#### Usage
|
||||
|
||||
The `@listen()` decorator can be used in several ways:
|
||||
|
||||
1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.
|
||||
|
||||
```python
|
||||
@listen("generate_city")
|
||||
def generate_fun_fact(self, random_city):
|
||||
# Implementation
|
||||
```
|
||||
|
||||
2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered.
|
||||
```python
|
||||
@listen(generate_city)
|
||||
def generate_fun_fact(self, random_city):
|
||||
# Implementation
|
||||
```
|
||||
|
||||
## Flow State Management
|
||||
|
||||
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management, allowing developers to choose the approach that best fits their application's needs.
|
||||
|
||||
### Unstructured State Management
|
||||
|
||||
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class. This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
class StructuredExampleFlow(Flow):
|
||||
|
||||
@start()
|
||||
async def first_method(self):
|
||||
self.state.message = "Hello from structured flow"
|
||||
self.state.counter = 0
|
||||
|
||||
@listen(first_method)
|
||||
async def second_method(self):
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated"
|
||||
|
||||
@listen(second_method)
|
||||
async def third_method(self):
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated again"
|
||||
|
||||
print(f"State after third_method: {self.state}")
|
||||
|
||||
|
||||
async def main():
|
||||
flow = StructuredExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
**Key Points:**
|
||||
|
||||
- **Flexibility:** You can dynamically add attributes to `self.state` without predefined constraints.
|
||||
- **Simplicity:** Ideal for straightforward workflows where state structure is minimal or varies significantly.
|
||||
|
||||
### Structured State Management
|
||||
|
||||
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow. By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
counter: int = 0
|
||||
message: str = ""
|
||||
|
||||
|
||||
class StructuredExampleFlow(Flow[ExampleState]):
|
||||
|
||||
@start()
|
||||
async def first_method(self):
|
||||
self.state.message = "Hello from structured flow"
|
||||
|
||||
@listen(first_method)
|
||||
async def second_method(self):
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated"
|
||||
|
||||
@listen(second_method)
|
||||
async def third_method(self):
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated again"
|
||||
|
||||
print(f"State after third_method: {self.state}")
|
||||
|
||||
|
||||
async def main():
|
||||
flow = StructuredExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
**Key Points:**
|
||||
|
||||
- **Defined Schema:** `ExampleState` clearly outlines the state structure, enhancing code readability and maintainability.
|
||||
- **Type Safety:** Leveraging Pydantic ensures that state attributes adhere to the specified types, reducing runtime errors.
|
||||
- **Auto-Completion:** IDEs can provide better auto-completion and error checking based on the defined state model.
|
||||
|
||||
### Choosing Between Unstructured and Structured State Management
|
||||
|
||||
- **Use Unstructured State Management when:**
|
||||
|
||||
- The workflow's state is simple or highly dynamic.
|
||||
- Flexibility is prioritized over strict state definitions.
|
||||
- Rapid prototyping is required without the overhead of defining schemas.
|
||||
|
||||
- **Use Structured State Management when:**
|
||||
- The workflow requires a well-defined and consistent state structure.
|
||||
- Type safety and validation are important for your application's reliability.
|
||||
- You want to leverage IDE features like auto-completion and type checking for better developer experience.
|
||||
|
||||
By providing both unstructured and structured state management options, CrewAI Flows empowers developers to build AI workflows that are both flexible and robust, catering to a wide range of application requirements.
|
||||
|
||||
@@ -169,9 +169,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if not self._start_methods:
|
||||
raise ValueError("No start method defined")
|
||||
|
||||
for start_method in self._start_methods:
|
||||
result = await self._execute_method(self._methods[start_method])
|
||||
await self._execute_listeners(start_method, result)
|
||||
# Create tasks for all start methods
|
||||
tasks = [
|
||||
self._execute_start_method(start_method)
|
||||
for start_method in self._start_methods
|
||||
]
|
||||
|
||||
# Run all start methods concurrently
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
async def _execute_start_method(self, start_method: str):
|
||||
result = await self._execute_method(self._methods[start_method])
|
||||
await self._execute_listeners(start_method, result)
|
||||
|
||||
async def _execute_method(self, method: Callable, *args, **kwargs):
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
|
||||
@@ -5,16 +5,16 @@ from openai import OpenAI
|
||||
|
||||
|
||||
class ExampleFlow(Flow):
|
||||
client = OpenAI()
|
||||
model = "gpt-4o-mini"
|
||||
|
||||
@start()
|
||||
def start_method(self):
|
||||
def generate_city(self):
|
||||
print("Starting flow")
|
||||
client = OpenAI()
|
||||
|
||||
response = client.chat.completions.create(
|
||||
model="gpt-4o-mini",
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=[
|
||||
{"role": "system", "content": "You are a helpful assistant."},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Return the name of a random city in the world.",
|
||||
@@ -23,14 +23,25 @@ class ExampleFlow(Flow):
|
||||
)
|
||||
|
||||
random_city = response.choices[0].message.content
|
||||
print("random_city", random_city)
|
||||
|
||||
print("---- Random City ----")
|
||||
print(random_city)
|
||||
return random_city
|
||||
|
||||
@listen(start_method)
|
||||
def second_method(self, result):
|
||||
print("Second method received:", result)
|
||||
# print("Second city", result)
|
||||
@listen(generate_city)
|
||||
def generate_fun_fact(self, random_city):
|
||||
response = self.client.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": f"Tell me a fun fact about {random_city}",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
fun_fact = response.choices[0].message.content
|
||||
print("---- Fun Fact ----")
|
||||
print(fun_fact)
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
Reference in New Issue
Block a user