From 50055a814cff92cb7aeaa065a5751315cb071914 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Mon, 23 Sep 2024 14:47:11 -0400 Subject: [PATCH] fixed flow output section --- docs/core-concepts/Flows.md | 135 +++++++++++++++++++++----- src/crewai/cli/templates/flow/main.py | 1 - src/crewai/flow/basic_flow.py | 52 ++++++++++ src/crewai/flow/flow.py | 44 +++------ src/crewai/flow/flow_output_test.py | 26 +++++ 5 files changed, 206 insertions(+), 52 deletions(-) create mode 100644 src/crewai/flow/basic_flow.py create mode 100644 src/crewai/flow/flow_output_test.py diff --git a/docs/core-concepts/Flows.md b/docs/core-concepts/Flows.md index 7aa396d9f..19a7bb7e3 100644 --- a/docs/core-concepts/Flows.md +++ b/docs/core-concepts/Flows.md @@ -22,18 +22,17 @@ Let's create a simple Flow where you will use OpenAI to generate a random city i import asyncio from crewai.flow.flow import Flow, listen, start -from openai import OpenAI +from litellm import completion class ExampleFlow(Flow): - client = OpenAI() model = "gpt-4o-mini" @start() def generate_city(self): print("Starting flow") - response = self.client.chat.completions.create( + response = completion( model=self.model, messages=[ { @@ -43,14 +42,14 @@ class ExampleFlow(Flow): ], ) - random_city = response.choices[0].message.content - print("---- Random City ----") - print(random_city) + random_city = response["choices"][0]["message"]["content"] + print(f"Random City: {random_city}") + return random_city @listen(generate_city) def generate_fun_fact(self, random_city): - response = self.client.chat.completions.create( + response = completion( model=self.model, messages=[ { @@ -60,18 +59,17 @@ class ExampleFlow(Flow): ], ) - fun_fact = response.choices[0].message.content - print("---- Fun Fact ----") - print(fun_fact) + fun_fact = response["choices"][0]["message"]["content"] + return fun_fact async def main(): flow = ExampleFlow() - await flow.kickoff() + result = await flow.kickoff() + print(f"Generated fun fact: {result}") asyncio.run(main()) - ``` In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task. @@ -105,6 +103,99 @@ The `@listen()` decorator can be used in several ways: # Implementation ``` +### Flow Output + +Accessing and handling the output of a Flow is essential for integrating your AI workflows into larger applications or systems. CrewAI Flows provide straightforward mechanisms to retrieve the final output, access intermediate results, and manage the overall state of your Flow. + +#### Retrieving the Final Output + +When you run a Flow, the final output is determined by the last method that completes. The `kickoff()` method returns the output of this final method. + +Here's how you can access the final output: + +```python +import asyncio +from crewai.flow.flow import Flow, listen, start + +class OutputExampleFlow(Flow): + @start() + def first_method(self): + return "Output from first_method" + + @listen(first_method) + def second_method(self, first_output): + return f"Second method received: {first_output}" + +async def main(): + flow = OutputExampleFlow() + final_output = await flow.kickoff() + print("---- Final Output ----") + print(final_output) + +asyncio.run(main()) +``` + +In this example, the `second_method` is the last method to complete, so its output will be the final output of the Flow. The `kickoff()` method will return this final output, which is then printed to the console. + +The output of the Flow will be: + +``` +---- Final Output ---- +Second method received: Output from first_method +``` + +#### Accessing and Updating State + +In addition to retrieving the final output, you can also access and update the state within your Flow. The state can be used to store and share data between different methods in the Flow. After the Flow has run, you can access the state to retrieve any information that was added or updated during the execution. + +Here's an example of how to update and access the state: + +```python +import asyncio +from crewai.flow.flow import Flow, listen, start +from pydantic import BaseModel + +class ExampleState(BaseModel): + counter: int = 0 + message: str = "" + +class StateExampleFlow(Flow[ExampleState]): + + @start() + def first_method(self): + self.state.message = "Hello from first_method" + self.state.counter += 1 + + @listen(first_method) + def second_method(self): + self.state.message += " - updated by second_method" + self.state.counter += 1 + return self.state.message + +async def main(): + flow = StateExampleFlow() + final_output = await flow.kickoff() + print("---- Final Output ----") + print(final_output) + print("---- Final State ----") + print(flow.state) + +asyncio.run(main()) +``` + +In this example, the state is updated by both `first_method` and `second_method`. After the Flow has run, you can access the final state to see the updates made by these methods. + +The output of the Flow will be: + +``` +---- Final Output ---- +Hello from first_method - updated by second_method +---- Final State ---- +counter=2 message='Hello from first_method - updated by second_method' +``` + +By ensuring that the final method's output is returned and providing access to the state, CrewAI Flows make it easy to integrate the results of your AI workflows into larger applications or systems, while also maintaining and accessing the state throughout the Flow's execution. + ## Flow State Management Managing state effectively is crucial for building reliable and maintainable AI workflows. CrewAI Flows provides robust mechanisms for both unstructured and structured state management, allowing developers to choose the approach that best fits their application's needs. @@ -121,17 +212,17 @@ from crewai.flow.flow import Flow, listen, start class UntructuredExampleFlow(Flow): @start() - async def first_method(self): + def first_method(self): self.state.message = "Hello from structured flow" self.state.counter = 0 @listen(first_method) - async def second_method(self): + def second_method(self): self.state.counter += 1 self.state.message += " - updated" @listen(second_method) - async def third_method(self): + def third_method(self): self.state.counter += 1 self.state.message += " - updated again" @@ -170,16 +261,16 @@ class ExampleState(BaseModel): class StructuredExampleFlow(Flow[ExampleState]): @start() - async def first_method(self): + def first_method(self): self.state.message = "Hello from structured flow" @listen(first_method) - async def second_method(self): + def second_method(self): self.state.counter += 1 self.state.message += " - updated" @listen(second_method) - async def third_method(self): + def third_method(self): self.state.counter += 1 self.state.message += " - updated again" @@ -316,24 +407,24 @@ class ExampleState(BaseModel): class RouterFlow(Flow[ExampleState]): @start() - async def start_method(self): + def start_method(self): print("Starting the structured flow") random_boolean = random.choice([True, False]) self.state.success_flag = random_boolean @router(start_method) - async def second_method(self): + def second_method(self): if self.state.success_flag: return "success" else: return "failed" @listen("success") - async def third_method(self): + def third_method(self): print("Third method running") @listen("failed") - async def fourth_method(self): + def fourth_method(self): print("Fourth method running") diff --git a/src/crewai/cli/templates/flow/main.py b/src/crewai/cli/templates/flow/main.py index b486b7a4d..bda89065d 100644 --- a/src/crewai/cli/templates/flow/main.py +++ b/src/crewai/cli/templates/flow/main.py @@ -11,7 +11,6 @@ class PoemState(BaseModel): poem: str = "" class PoemFlow(Flow[PoemState]): - initial_state = PoemState @start() def generate_sentence_count(self): diff --git a/src/crewai/flow/basic_flow.py b/src/crewai/flow/basic_flow.py new file mode 100644 index 000000000..6c126f9a9 --- /dev/null +++ b/src/crewai/flow/basic_flow.py @@ -0,0 +1,52 @@ +import asyncio + +from crewai.flow.flow import Flow, listen, start +from litellm import completion + + +class ExampleFlow(Flow): + model = "gpt-4o-mini" + + @start() + def generate_city(self): + print("Starting flow") + + response = completion( + model=self.model, + messages=[ + { + "role": "user", + "content": "Return the name of a random city in the world.", + }, + ], + ) + + random_city = response["choices"][0]["message"]["content"] + print(f"Random City: {random_city}") + + return random_city + + @listen(generate_city) + def generate_fun_fact(self, random_city): + response = completion( + model=self.model, + messages=[ + { + "role": "user", + "content": f"Tell me a fun fact about {random_city}", + }, + ], + ) + + fun_fact = response["choices"][0]["message"]["content"] + return fun_fact + + +async def main(): + flow = ExampleFlow() + result = await flow.kickoff() + + print(f"Generated fun fact: {result}") + + +asyncio.run(main()) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index eb828fcca..ab561349f 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -2,7 +2,7 @@ import asyncio import inspect from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) @@ -140,7 +140,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self._state = self._create_initial_state() self._completed_methods: Set[str] = set() self._pending_and_listeners: Dict[str, Set[str]] = {} - self._flow_output = FlowOutput() + self._method_outputs: List[Any] = [] # List to store all method outputs for method_name in dir(self): if callable(getattr(self, method_name)) and not method_name.startswith( @@ -162,7 +162,12 @@ class Flow(Generic[T], metaclass=FlowMeta): def state(self) -> T: return self._state - async def kickoff(self): + @property + def method_outputs(self) -> List[Any]: + """Returns the list of all outputs from executed methods.""" + return self._method_outputs + + async def kickoff(self) -> Any: if not self._start_methods: raise ValueError("No start method defined") @@ -175,6 +180,12 @@ class Flow(Generic[T], metaclass=FlowMeta): # Run all start methods concurrently await asyncio.gather(*tasks) + # Return the final output (from the last executed method) + if self._method_outputs: + return self._method_outputs[-1] + else: + return None # Or raise an exception if no methods were executed + async def _execute_start_method(self, start_method: str): result = await self._execute_method(self._methods[start_method]) await self._execute_listeners(start_method, result) @@ -185,7 +196,7 @@ class Flow(Generic[T], metaclass=FlowMeta): if asyncio.iscoroutinefunction(method) else method(*args, **kwargs) ) - self._flow_output.add_method_output(result) + self._method_outputs.append(result) # Store the output return result async def _execute_listeners(self, trigger_method: str, result: Any): @@ -239,28 +250,3 @@ class Flow(Generic[T], metaclass=FlowMeta): import traceback traceback.print_exc() - - -class FlowOutput(BaseModel): - state: Dict[str, Any] = Field( - default_factory=dict, description="Final state of the flow" - ) - method_outputs: List[Any] = Field( - default_factory=list, description="List of outputs from all executed methods" - ) - - @property - def final_output(self) -> Any: - """Get the output of the last executed method.""" - return self.method_outputs[-1] if self.method_outputs else None - - def add_method_output(self, output: Any): - """Add a method output to the list and update the final method name.""" - self.method_outputs.append(output) - - def update_state(self, new_state: Dict[str, Any]): - """Update the flow state.""" - self.state.update(new_state) - - class Config: - arbitrary_types_allowed = True diff --git a/src/crewai/flow/flow_output_test.py b/src/crewai/flow/flow_output_test.py new file mode 100644 index 000000000..67d6bc76a --- /dev/null +++ b/src/crewai/flow/flow_output_test.py @@ -0,0 +1,26 @@ +import asyncio + +from crewai.flow.flow import Flow, listen, start + + +class OutputExampleFlow(Flow): + @start() + async def first_method(self): + return "Output from first_method" + + @listen(first_method) + async def second_method(self, first_output): + return f"Second method received: {first_output}" + + +async def main(): + flow = OutputExampleFlow() + outputs = await flow.kickoff() + print("---- Flow Outputs ----") + print(outputs) + + print(" FLOW STATE POST RUN") + print(flow.state) + + +asyncio.run(main())