fixed flow output section

This commit is contained in:
Brandon Hancock
2024-09-23 14:47:11 -04:00
parent 3939d432aa
commit 50055a814c
5 changed files with 206 additions and 52 deletions

View File

@@ -22,18 +22,17 @@ Let's create a simple Flow where you will use OpenAI to generate a random city i
import asyncio
from crewai.flow.flow import Flow, listen, start
from openai import OpenAI
from litellm import completion
class ExampleFlow(Flow):
client = OpenAI()
model = "gpt-4o-mini"
@start()
def generate_city(self):
print("Starting flow")
response = self.client.chat.completions.create(
response = completion(
model=self.model,
messages=[
{
@@ -43,14 +42,14 @@ class ExampleFlow(Flow):
],
)
random_city = response.choices[0].message.content
print("---- Random City ----")
print(random_city)
random_city = response["choices"][0]["message"]["content"]
print(f"Random City: {random_city}")
return random_city
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = self.client.chat.completions.create(
response = completion(
model=self.model,
messages=[
{
@@ -60,18 +59,17 @@ class ExampleFlow(Flow):
],
)
fun_fact = response.choices[0].message.content
print("---- Fun Fact ----")
print(fun_fact)
fun_fact = response["choices"][0]["message"]["content"]
return fun_fact
async def main():
flow = ExampleFlow()
await flow.kickoff()
result = await flow.kickoff()
print(f"Generated fun fact: {result}")
asyncio.run(main())
```
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
@@ -105,6 +103,99 @@ The `@listen()` decorator can be used in several ways:
# Implementation
```
### Flow Output
Accessing and handling the output of a Flow is essential for integrating your AI workflows into larger applications or systems. CrewAI Flows provide straightforward mechanisms to retrieve the final output, access intermediate results, and manage the overall state of your Flow.
#### Retrieving the Final Output
When you run a Flow, the final output is determined by the last method that completes. The `kickoff()` method returns the output of this final method.
Here's how you can access the final output:
```python
import asyncio
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
def first_method(self):
return "Output from first_method"
@listen(first_method)
def second_method(self, first_output):
return f"Second method received: {first_output}"
async def main():
flow = OutputExampleFlow()
final_output = await flow.kickoff()
print("---- Final Output ----")
print(final_output)
asyncio.run(main())
```
In this example, the `second_method` is the last method to complete, so its output will be the final output of the Flow. The `kickoff()` method will return this final output, which is then printed to the console.
The output of the Flow will be:
```
---- Final Output ----
Second method received: Output from first_method
```
#### Accessing and Updating State
In addition to retrieving the final output, you can also access and update the state within your Flow. The state can be used to store and share data between different methods in the Flow. After the Flow has run, you can access the state to retrieve any information that was added or updated during the execution.
Here's an example of how to update and access the state:
```python
import asyncio
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class StateExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
return self.state.message
async def main():
flow = StateExampleFlow()
final_output = await flow.kickoff()
print("---- Final Output ----")
print(final_output)
print("---- Final State ----")
print(flow.state)
asyncio.run(main())
```
In this example, the state is updated by both `first_method` and `second_method`. After the Flow has run, you can access the final state to see the updates made by these methods.
The output of the Flow will be:
```
---- Final Output ----
Hello from first_method - updated by second_method
---- Final State ----
counter=2 message='Hello from first_method - updated by second_method'
```
By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems, while also maintaining and accessing the state throughout the Flow's execution.
## Flow State Management
Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management, allowing developers to choose the approach that best fits their application's needs.
@@ -121,17 +212,17 @@ from crewai.flow.flow import Flow, listen, start
class UntructuredExampleFlow(Flow):
@start()
async def first_method(self):
def first_method(self):
self.state.message = "Hello from structured flow"
self.state.counter = 0
@listen(first_method)
async def second_method(self):
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
async def third_method(self):
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
@@ -170,16 +261,16 @@ class ExampleState(BaseModel):
class StructuredExampleFlow(Flow[ExampleState]):
@start()
async def first_method(self):
def first_method(self):
self.state.message = "Hello from structured flow"
@listen(first_method)
async def second_method(self):
def second_method(self):
self.state.counter += 1
self.state.message += " - updated"
@listen(second_method)
async def third_method(self):
def third_method(self):
self.state.counter += 1
self.state.message += " - updated again"
@@ -316,24 +407,24 @@ class ExampleState(BaseModel):
class RouterFlow(Flow[ExampleState]):
@start()
async def start_method(self):
def start_method(self):
print("Starting the structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
async def second_method(self):
def second_method(self):
if self.state.success_flag:
return "success"
else:
return "failed"
@listen("success")
async def third_method(self):
def third_method(self):
print("Third method running")
@listen("failed")
async def fourth_method(self):
def fourth_method(self):
print("Fourth method running")

View File

@@ -11,7 +11,6 @@ class PoemState(BaseModel):
poem: str = ""
class PoemFlow(Flow[PoemState]):
initial_state = PoemState
@start()
def generate_sentence_count(self):

View File

@@ -0,0 +1,52 @@
import asyncio
from crewai.flow.flow import Flow, listen, start
from litellm import completion
class ExampleFlow(Flow):
model = "gpt-4o-mini"
@start()
def generate_city(self):
print("Starting flow")
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
random_city = response["choices"][0]["message"]["content"]
print(f"Random City: {random_city}")
return random_city
@listen(generate_city)
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
fun_fact = response["choices"][0]["message"]["content"]
return fun_fact
async def main():
flow = ExampleFlow()
result = await flow.kickoff()
print(f"Generated fun fact: {result}")
asyncio.run(main())

View File

@@ -2,7 +2,7 @@ import asyncio
import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
from pydantic import BaseModel, Field
from pydantic import BaseModel
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
@@ -140,7 +140,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._state = self._create_initial_state()
self._completed_methods: Set[str] = set()
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._flow_output = FlowOutput()
self._method_outputs: List[Any] = [] # List to store all method outputs
for method_name in dir(self):
if callable(getattr(self, method_name)) and not method_name.startswith(
@@ -162,7 +162,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
def state(self) -> T:
return self._state
async def kickoff(self):
@property
def method_outputs(self) -> List[Any]:
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
async def kickoff(self) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")
@@ -175,6 +180,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Run all start methods concurrently
await asyncio.gather(*tasks)
# Return the final output (from the last executed method)
if self._method_outputs:
return self._method_outputs[-1]
else:
return None # Or raise an exception if no methods were executed
async def _execute_start_method(self, start_method: str):
result = await self._execute_method(self._methods[start_method])
await self._execute_listeners(start_method, result)
@@ -185,7 +196,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
self._flow_output.add_method_output(result)
self._method_outputs.append(result) # Store the output
return result
async def _execute_listeners(self, trigger_method: str, result: Any):
@@ -239,28 +250,3 @@ class Flow(Generic[T], metaclass=FlowMeta):
import traceback
traceback.print_exc()
class FlowOutput(BaseModel):
state: Dict[str, Any] = Field(
default_factory=dict, description="Final state of the flow"
)
method_outputs: List[Any] = Field(
default_factory=list, description="List of outputs from all executed methods"
)
@property
def final_output(self) -> Any:
"""Get the output of the last executed method."""
return self.method_outputs[-1] if self.method_outputs else None
def add_method_output(self, output: Any):
"""Add a method output to the list and update the final method name."""
self.method_outputs.append(output)
def update_state(self, new_state: Dict[str, Any]):
"""Update the flow state."""
self.state.update(new_state)
class Config:
arbitrary_types_allowed = True

View File

@@ -0,0 +1,26 @@
import asyncio
from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow):
@start()
async def first_method(self):
return "Output from first_method"
@listen(first_method)
async def second_method(self, first_output):
return f"Second method received: {first_output}"
async def main():
flow = OutputExampleFlow()
outputs = await flow.kickoff()
print("---- Flow Outputs ----")
print(outputs)
print(" FLOW STATE POST RUN")
print(flow.state)
asyncio.run(main())