diff --git a/docs/core-concepts/Flows.md b/docs/core-concepts/Flows.md index e69de29bb..863da1d86 100644 --- a/docs/core-concepts/Flows.md +++ b/docs/core-concepts/Flows.md @@ -0,0 +1,17 @@ +# CrewAI Flows + +## Introduction + +CrewAI Flows is a powerful feature designed to streamline the creation and management of AI workflows. Flows allow developers to combine and coordinate coding tasks and Crews efficiently, providing a robust framework for building sophisticated AI automations. + +Flows allow you to create structured, event-driven workflows. They provide a seamless way to connect multiple tasks, manage state, and control the flow of execution in your AI applications. With Flows, you can easily design and implement multi-step processes that leverage the full potential of CrewAI's capabilities. + +1. **Simplified Workflow Creation**: Easily chain together multiple Crews and tasks to create complex AI workflows. + +2. **State Management**: Flows make it super easy to manage and share state between different tasks in your workflow. + +3. **Event-Driven Architecture**: Built on an event-driven model, allowing for dynamic and responsive workflows. + +4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows. + +## diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 0ff5d111b..f4e7bed32 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -13,7 +13,6 @@ T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) def start(condition=None): def decorator(func): - print(f"[start decorator] Decorating start method: {func.__name__}") func.__is_start_method__ = True if condition is not None: if isinstance(condition, str): @@ -64,9 +63,6 @@ def listen(condition): def router(method): def decorator(func): - print( - f"[router decorator] Decorating router: {func.__name__} for method: {method.__name__}" - ) func.__is_router__ = True func.__router_for__ = method.__name__ return func @@ -144,7 +140,6 @@ class Flow(Generic[T], metaclass=FlowMeta): return _FlowGeneric def __init__(self): - print("[Flow.__init__] Initializing Flow") self._methods: Dict[str, Callable] = {} self._state = self._create_initial_state() self._completed_methods: Set[str] = set() @@ -157,7 +152,6 @@ class Flow(Generic[T], metaclass=FlowMeta): self._methods[method_name] = getattr(self, method_name) def _create_initial_state(self) -> T: - print("[Flow._create_initial_state] Creating initial state") if self.initial_state is None and hasattr(self, "_initial_state_T"): return self._initial_state_T() # type: ignore if self.initial_state is None: @@ -172,26 +166,20 @@ class Flow(Generic[T], metaclass=FlowMeta): return self._state async def kickoff(self): - print("[Flow.kickoff] Starting kickoff") if not self._start_methods: raise ValueError("No start method defined") for start_method in self._start_methods: - print(f"[Flow.kickoff] Executing start method: {start_method}") result = await self._execute_method(self._methods[start_method]) await self._execute_listeners(start_method, result) async def _execute_method(self, method: Callable, *args, **kwargs): - print(f"[Flow._execute_method] Executing method: {method.__name__}") - if inspect.iscoroutinefunction(method): + if asyncio.iscoroutinefunction(method): return await method(*args, **kwargs) else: return method(*args, **kwargs) async def _execute_listeners(self, trigger_method: str, result: Any): - print( - f"[Flow._execute_listeners] Executing listeners for trigger method: {trigger_method}" - ) listener_tasks = [] if trigger_method in self._routers: @@ -220,14 +208,22 @@ class Flow(Generic[T], metaclass=FlowMeta): await asyncio.gather(*listener_tasks) async def _execute_single_listener(self, listener: str, result: Any): - print(f"[Flow._execute_single_listener] Executing listener: {listener}") try: method = self._methods[listener] sig = inspect.signature(method) - if len(sig.parameters) > 1: # More than just 'self' + params = list(sig.parameters.values()) + + # Exclude 'self' parameter + method_params = [p for p in params if p.name != "self"] + + if method_params: + # If listener expects parameters, pass the result listener_result = await self._execute_method(method, result) else: + # If listener does not expect parameters, call without arguments listener_result = await self._execute_method(method) + + # Execute listeners of this listener await self._execute_listeners(listener, listener_result) except Exception as e: print(f"[Flow._execute_single_listener] Error in method {listener}: {e}") diff --git a/src/crewai/flow/super_simple_ai_flow.py b/src/crewai/flow/super_simple_ai_flow.py new file mode 100644 index 000000000..5e1ee7541 --- /dev/null +++ b/src/crewai/flow/super_simple_ai_flow.py @@ -0,0 +1,41 @@ +import asyncio + +from crewai.flow.flow import Flow, listen, start +from openai import OpenAI + + +class ExampleFlow(Flow): + + @start() + def start_method(self): + print("Starting flow") + client = OpenAI() + + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + { + "role": "user", + "content": "Return the name of a random city in the world.", + }, + ], + ) + + random_city = response.choices[0].message.content + print("random_city", random_city) + + return random_city + + @listen(start_method) + def second_method(self, result): + print("Second method received:", result) + # print("Second city", result) + + +async def main(): + flow = ExampleFlow() + await flow.kickoff() + + +asyncio.run(main())