mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-17 12:58:31 +00:00
Compare commits
3 Commits
devin/1738
...
feat/add-i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9be9d7d61c | ||
|
|
d2db938d50 | ||
|
|
cf90bd3105 |
@@ -18,60 +18,63 @@ Flows allow you to create structured, event-driven workflows. They provide a sea
|
|||||||
|
|
||||||
4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.
|
4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.
|
||||||
|
|
||||||
|
5. **Input Flexibility**: Flows can accept inputs to initialize or update their state, with different handling for structured and unstructured state management.
|
||||||
|
|
||||||
## Getting Started
|
## Getting Started
|
||||||
|
|
||||||
Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.
|
Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.
|
||||||
|
|
||||||
```python Code
|
### Passing Inputs to Flows
|
||||||
|
|
||||||
|
Flows can accept inputs to initialize or update their state before execution. The way inputs are handled depends on whether the flow uses structured or unstructured state management.
|
||||||
|
|
||||||
|
#### Structured State Management
|
||||||
|
|
||||||
|
In structured state management, the flow's state is defined using a Pydantic `BaseModel`. Inputs must match the model's schema, and any updates will overwrite the default values.
|
||||||
|
|
||||||
|
```python
|
||||||
from crewai.flow.flow import Flow, listen, start
|
from crewai.flow.flow import Flow, listen, start
|
||||||
from dotenv import load_dotenv
|
from pydantic import BaseModel
|
||||||
from litellm import completion
|
|
||||||
|
|
||||||
|
class ExampleState(BaseModel):
|
||||||
|
counter: int = 0
|
||||||
|
message: str = ""
|
||||||
|
|
||||||
class ExampleFlow(Flow):
|
class StructuredExampleFlow(Flow[ExampleState]):
|
||||||
model = "gpt-4o-mini"
|
|
||||||
|
|
||||||
@start()
|
@start()
|
||||||
def generate_city(self):
|
def first_method(self):
|
||||||
print("Starting flow")
|
# Implementation
|
||||||
|
|
||||||
response = completion(
|
flow = StructuredExampleFlow()
|
||||||
model=self.model,
|
flow.kickoff(inputs={"counter": 10})
|
||||||
messages=[
|
```
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": "Return the name of a random city in the world.",
|
|
||||||
},
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
random_city = response["choices"][0]["message"]["content"]
|
In this example, the `counter` is initialized to `10`, while `message` retains its default value.
|
||||||
print(f"Random City: {random_city}")
|
|
||||||
|
|
||||||
return random_city
|
#### Unstructured State Management
|
||||||
|
|
||||||
@listen(generate_city)
|
In unstructured state management, the flow's state is a dictionary. You can pass any dictionary to update the state.
|
||||||
def generate_fun_fact(self, random_city):
|
|
||||||
response = completion(
|
|
||||||
model=self.model,
|
|
||||||
messages=[
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"Tell me a fun fact about {random_city}",
|
|
||||||
},
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
fun_fact = response["choices"][0]["message"]["content"]
|
```python
|
||||||
return fun_fact
|
from crewai.flow.flow import Flow, listen, start
|
||||||
|
|
||||||
|
class UnstructuredExampleFlow(Flow):
|
||||||
|
@start()
|
||||||
|
def first_method(self):
|
||||||
|
# Implementation
|
||||||
|
|
||||||
|
flow = UnstructuredExampleFlow()
|
||||||
|
flow.kickoff(inputs={"counter": 5, "message": "Initial message"})
|
||||||
|
```
|
||||||
|
|
||||||
flow = ExampleFlow()
|
Here, both `counter` and `message` are updated based on the provided inputs.
|
||||||
result = flow.kickoff()
|
|
||||||
|
|
||||||
print(f"Generated fun fact: {result}")
|
**Note:** Ensure that inputs for structured state management adhere to the defined schema to avoid validation errors.
|
||||||
|
|
||||||
|
### Example Flow
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Existing example code
|
||||||
```
|
```
|
||||||
|
|
||||||
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
|
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
|
||||||
@@ -94,14 +97,14 @@ The `@listen()` decorator can be used in several ways:
|
|||||||
|
|
||||||
1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.
|
1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
@listen("generate_city")
|
@listen("generate_city")
|
||||||
def generate_fun_fact(self, random_city):
|
def generate_fun_fact(self, random_city):
|
||||||
# Implementation
|
# Implementation
|
||||||
```
|
```
|
||||||
|
|
||||||
2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered.
|
2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered.
|
||||||
```python Code
|
```python
|
||||||
@listen(generate_city)
|
@listen(generate_city)
|
||||||
def generate_fun_fact(self, random_city):
|
def generate_fun_fact(self, random_city):
|
||||||
# Implementation
|
# Implementation
|
||||||
@@ -118,7 +121,7 @@ When you run a Flow, the final output is determined by the last method that comp
|
|||||||
Here's how you can access the final output:
|
Here's how you can access the final output:
|
||||||
|
|
||||||
<CodeGroup>
|
<CodeGroup>
|
||||||
```python Code
|
```python
|
||||||
from crewai.flow.flow import Flow, listen, start
|
from crewai.flow.flow import Flow, listen, start
|
||||||
|
|
||||||
class OutputExampleFlow(Flow):
|
class OutputExampleFlow(Flow):
|
||||||
@@ -130,18 +133,17 @@ class OutputExampleFlow(Flow):
|
|||||||
def second_method(self, first_output):
|
def second_method(self, first_output):
|
||||||
return f"Second method received: {first_output}"
|
return f"Second method received: {first_output}"
|
||||||
|
|
||||||
|
|
||||||
flow = OutputExampleFlow()
|
flow = OutputExampleFlow()
|
||||||
final_output = flow.kickoff()
|
final_output = flow.kickoff()
|
||||||
|
|
||||||
print("---- Final Output ----")
|
print("---- Final Output ----")
|
||||||
print(final_output)
|
print(final_output)
|
||||||
````
|
```
|
||||||
|
|
||||||
``` text Output
|
```text
|
||||||
---- Final Output ----
|
---- Final Output ----
|
||||||
Second method received: Output from first_method
|
Second method received: Output from first_method
|
||||||
````
|
```
|
||||||
|
|
||||||
</CodeGroup>
|
</CodeGroup>
|
||||||
|
|
||||||
@@ -156,7 +158,7 @@ Here's an example of how to update and access the state:
|
|||||||
|
|
||||||
<CodeGroup>
|
<CodeGroup>
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
from crewai.flow.flow import Flow, listen, start
|
from crewai.flow.flow import Flow, listen, start
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@@ -184,7 +186,7 @@ print("Final State:")
|
|||||||
print(flow.state)
|
print(flow.state)
|
||||||
```
|
```
|
||||||
|
|
||||||
```text Output
|
```text
|
||||||
Final Output: Hello from first_method - updated by second_method
|
Final Output: Hello from first_method - updated by second_method
|
||||||
Final State:
|
Final State:
|
||||||
counter=2 message='Hello from first_method - updated by second_method'
|
counter=2 message='Hello from first_method - updated by second_method'
|
||||||
@@ -208,10 +210,10 @@ allowing developers to choose the approach that best fits their application's ne
|
|||||||
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
|
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
|
||||||
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
|
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
from crewai.flow.flow import Flow, listen, start
|
from crewai.flow.flow import Flow, listen, start
|
||||||
|
|
||||||
class UntructuredExampleFlow(Flow):
|
class UnstructuredExampleFlow(Flow):
|
||||||
|
|
||||||
@start()
|
@start()
|
||||||
def first_method(self):
|
def first_method(self):
|
||||||
@@ -230,8 +232,7 @@ class UntructuredExampleFlow(Flow):
|
|||||||
|
|
||||||
print(f"State after third_method: {self.state}")
|
print(f"State after third_method: {self.state}")
|
||||||
|
|
||||||
|
flow = UnstructuredExampleFlow()
|
||||||
flow = UntructuredExampleFlow()
|
|
||||||
flow.kickoff()
|
flow.kickoff()
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -245,16 +246,14 @@ flow.kickoff()
|
|||||||
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
|
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
|
||||||
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
|
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
from crewai.flow.flow import Flow, listen, start
|
from crewai.flow.flow import Flow, listen, start
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
class ExampleState(BaseModel):
|
class ExampleState(BaseModel):
|
||||||
counter: int = 0
|
counter: int = 0
|
||||||
message: str = ""
|
message: str = ""
|
||||||
|
|
||||||
|
|
||||||
class StructuredExampleFlow(Flow[ExampleState]):
|
class StructuredExampleFlow(Flow[ExampleState]):
|
||||||
|
|
||||||
@start()
|
@start()
|
||||||
@@ -273,7 +272,6 @@ class StructuredExampleFlow(Flow[ExampleState]):
|
|||||||
|
|
||||||
print(f"State after third_method: {self.state}")
|
print(f"State after third_method: {self.state}")
|
||||||
|
|
||||||
|
|
||||||
flow = StructuredExampleFlow()
|
flow = StructuredExampleFlow()
|
||||||
flow.kickoff()
|
flow.kickoff()
|
||||||
```
|
```
|
||||||
@@ -307,7 +305,7 @@ The `or_` function in Flows allows you to listen to multiple methods and trigger
|
|||||||
|
|
||||||
<CodeGroup>
|
<CodeGroup>
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
from crewai.flow.flow import Flow, listen, or_, start
|
from crewai.flow.flow import Flow, listen, or_, start
|
||||||
|
|
||||||
class OrExampleFlow(Flow):
|
class OrExampleFlow(Flow):
|
||||||
@@ -324,13 +322,11 @@ class OrExampleFlow(Flow):
|
|||||||
def logger(self, result):
|
def logger(self, result):
|
||||||
print(f"Logger: {result}")
|
print(f"Logger: {result}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
flow = OrExampleFlow()
|
flow = OrExampleFlow()
|
||||||
flow.kickoff()
|
flow.kickoff()
|
||||||
```
|
```
|
||||||
|
|
||||||
```text Output
|
```text
|
||||||
Logger: Hello from the start method
|
Logger: Hello from the start method
|
||||||
Logger: Hello from the second method
|
Logger: Hello from the second method
|
||||||
```
|
```
|
||||||
@@ -346,7 +342,7 @@ The `and_` function in Flows allows you to listen to multiple methods and trigge
|
|||||||
|
|
||||||
<CodeGroup>
|
<CodeGroup>
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
from crewai.flow.flow import Flow, and_, listen, start
|
from crewai.flow.flow import Flow, and_, listen, start
|
||||||
|
|
||||||
class AndExampleFlow(Flow):
|
class AndExampleFlow(Flow):
|
||||||
@@ -368,7 +364,7 @@ flow = AndExampleFlow()
|
|||||||
flow.kickoff()
|
flow.kickoff()
|
||||||
```
|
```
|
||||||
|
|
||||||
```text Output
|
```text
|
||||||
---- Logger ----
|
---- Logger ----
|
||||||
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
|
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
|
||||||
```
|
```
|
||||||
@@ -385,7 +381,7 @@ You can specify different routes based on the output of the method, allowing you
|
|||||||
|
|
||||||
<CodeGroup>
|
<CodeGroup>
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
import random
|
import random
|
||||||
from crewai.flow.flow import Flow, listen, router, start
|
from crewai.flow.flow import Flow, listen, router, start
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -416,12 +412,11 @@ class RouterFlow(Flow[ExampleState]):
|
|||||||
def fourth_method(self):
|
def fourth_method(self):
|
||||||
print("Fourth method running")
|
print("Fourth method running")
|
||||||
|
|
||||||
|
|
||||||
flow = RouterFlow()
|
flow = RouterFlow()
|
||||||
flow.kickoff()
|
flow.kickoff()
|
||||||
```
|
```
|
||||||
|
|
||||||
```text Output
|
```text
|
||||||
Starting the structured flow
|
Starting the structured flow
|
||||||
Third method running
|
Third method running
|
||||||
Fourth method running
|
Fourth method running
|
||||||
@@ -484,7 +479,7 @@ The `main.py` file is where you create your flow and connect the crews together.
|
|||||||
|
|
||||||
Here's an example of how you can connect the `poem_crew` in the `main.py` file:
|
Here's an example of how you can connect the `poem_crew` in the `main.py` file:
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
from random import randint
|
from random import randint
|
||||||
|
|
||||||
@@ -612,7 +607,7 @@ CrewAI provides two convenient methods to generate plots of your flows:
|
|||||||
|
|
||||||
If you are working directly with a flow instance, you can generate a plot by calling the `plot()` method on your flow object. This method will create an HTML file containing the interactive plot of your flow.
|
If you are working directly with a flow instance, you can generate a plot by calling the `plot()` method on your flow object. This method will create an HTML file containing the interactive plot of your flow.
|
||||||
|
|
||||||
```python Code
|
```python
|
||||||
# Assuming you have a flow instance
|
# Assuming you have a flow instance
|
||||||
flow.plot("my_flow_plot")
|
flow.plot("my_flow_plot")
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -1,8 +1,20 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import inspect
|
import inspect
|
||||||
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Callable,
|
||||||
|
Dict,
|
||||||
|
Generic,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Set,
|
||||||
|
Type,
|
||||||
|
TypeVar,
|
||||||
|
Union,
|
||||||
|
cast,
|
||||||
|
)
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel, ValidationError
|
||||||
|
|
||||||
from crewai.flow.flow_visualizer import plot_flow
|
from crewai.flow.flow_visualizer import plot_flow
|
||||||
from crewai.flow.utils import get_possible_return_constants
|
from crewai.flow.utils import get_possible_return_constants
|
||||||
@@ -191,10 +203,74 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
"""Returns the list of all outputs from executed methods."""
|
"""Returns the list of all outputs from executed methods."""
|
||||||
return self._method_outputs
|
return self._method_outputs
|
||||||
|
|
||||||
def kickoff(self) -> Any:
|
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
|
||||||
|
"""
|
||||||
|
Initializes or updates the state with the provided inputs.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: Dictionary of inputs to initialize or update the state.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If inputs do not match the structured state model.
|
||||||
|
TypeError: If state is neither a BaseModel instance nor a dictionary.
|
||||||
|
"""
|
||||||
|
if isinstance(self._state, BaseModel):
|
||||||
|
# Structured state management
|
||||||
|
try:
|
||||||
|
# Define a function to create the dynamic class
|
||||||
|
def create_model_with_extra_forbid(
|
||||||
|
base_model: Type[BaseModel],
|
||||||
|
) -> Type[BaseModel]:
|
||||||
|
class ModelWithExtraForbid(base_model): # type: ignore
|
||||||
|
model_config = base_model.model_config.copy()
|
||||||
|
model_config["extra"] = "forbid"
|
||||||
|
|
||||||
|
return ModelWithExtraForbid
|
||||||
|
|
||||||
|
# Create the dynamic class
|
||||||
|
ModelWithExtraForbid = create_model_with_extra_forbid(
|
||||||
|
self._state.__class__
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a new instance using the combined state and inputs
|
||||||
|
self._state = cast(
|
||||||
|
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs})
|
||||||
|
)
|
||||||
|
|
||||||
|
except ValidationError as e:
|
||||||
|
raise ValueError(f"Invalid inputs for structured state: {e}") from e
|
||||||
|
elif isinstance(self._state, dict):
|
||||||
|
# Unstructured state management
|
||||||
|
self._state.update(inputs)
|
||||||
|
else:
|
||||||
|
raise TypeError("State must be a BaseModel instance or a dictionary.")
|
||||||
|
|
||||||
|
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||||
|
"""
|
||||||
|
Starts the execution of the flow synchronously.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: Optional dictionary of inputs to initialize or update the state.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The final output from the flow execution.
|
||||||
|
"""
|
||||||
|
if inputs is not None:
|
||||||
|
self._initialize_state(inputs)
|
||||||
return asyncio.run(self.kickoff_async())
|
return asyncio.run(self.kickoff_async())
|
||||||
|
|
||||||
async def kickoff_async(self) -> Any:
|
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||||
|
"""
|
||||||
|
Starts the execution of the flow asynchronously.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: Optional dictionary of inputs to initialize or update the state.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The final output from the flow execution.
|
||||||
|
"""
|
||||||
|
if inputs is not None:
|
||||||
|
self._initialize_state(inputs)
|
||||||
if not self._start_methods:
|
if not self._start_methods:
|
||||||
raise ValueError("No start method defined")
|
raise ValueError("No start method defined")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user