From 3939d432aa0084a7e12a5ca7271d9e4a76c01c2b Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Fri, 20 Sep 2024 15:27:55 -0400 Subject: [PATCH] add flow output --- docs/core-concepts/Flows.md | 11 ++++++++-- src/crewai/flow/flow.py | 43 +++++++++++++++++++++++++++++-------- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/docs/core-concepts/Flows.md b/docs/core-concepts/Flows.md index a20fbe958..7aa396d9f 100644 --- a/docs/core-concepts/Flows.md +++ b/docs/core-concepts/Flows.md @@ -358,8 +358,15 @@ Third method running - The easiest way to create flows with Crews is to use the `crewai create flow ` command. This will create a new CrewAI project for you that includes a folders for your Crews. -## - ``` ``` + +## Next Steps + +- Recommend checking out our flow examples in the CrewAI Examples repository to see more use cases. +- Currently, there are 4 flow examples: + - email auto responder flow + - lead score flow + - Write a book flow + - Meeting assistant flow diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 954902651..eb828fcca 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -2,11 +2,7 @@ import asyncio import inspect from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union -from pydantic import BaseModel - -# TODO: Allow people to pass results from one method to another and not just state -# TODO: Add in thiago and eduardo suggestions -# TODO: Add the ability to for start to handle _and and _or conditions +from pydantic import BaseModel, Field T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) @@ -144,6 +140,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self._state = self._create_initial_state() self._completed_methods: Set[str] = set() self._pending_and_listeners: Dict[str, Set[str]] = {} + self._flow_output = FlowOutput() for method_name in dir(self): if callable(getattr(self, method_name)) and not method_name.startswith( @@ -183,10 +180,13 @@ class Flow(Generic[T], metaclass=FlowMeta): await self._execute_listeners(start_method, result) async def _execute_method(self, method: Callable, *args, **kwargs): - if asyncio.iscoroutinefunction(method): - return await method(*args, **kwargs) - else: - return method(*args, **kwargs) + result = ( + await method(*args, **kwargs) + if asyncio.iscoroutinefunction(method) + else method(*args, **kwargs) + ) + self._flow_output.add_method_output(result) + return result async def _execute_listeners(self, trigger_method: str, result: Any): listener_tasks = [] @@ -239,3 +239,28 @@ class Flow(Generic[T], metaclass=FlowMeta): import traceback traceback.print_exc() + + +class FlowOutput(BaseModel): + state: Dict[str, Any] = Field( + default_factory=dict, description="Final state of the flow" + ) + method_outputs: List[Any] = Field( + default_factory=list, description="List of outputs from all executed methods" + ) + + @property + def final_output(self) -> Any: + """Get the output of the last executed method.""" + return self.method_outputs[-1] if self.method_outputs else None + + def add_method_output(self, output: Any): + """Add a method output to the list and update the final method name.""" + self.method_outputs.append(output) + + def update_state(self, new_state: Dict[str, Any]): + """Update the flow state.""" + self.state.update(new_state) + + class Config: + arbitrary_types_allowed = True