mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-28 09:38:17 +00:00
Flow passing results again
This commit is contained in:
@@ -0,0 +1,17 @@
|
|||||||
|
# CrewAI Flows
|
||||||
|
|
||||||
|
## Introduction
|
||||||
|
|
||||||
|
CrewAI Flows is a powerful feature designed to streamline the creation and management of AI workflows. Flows allow developers to combine and coordinate coding tasks and Crews efficiently, providing a robust framework for building sophisticated AI automations.
|
||||||
|
|
||||||
|
Flows allow you to create structured, event-driven workflows. They provide a seamless way to connect multiple tasks, manage state, and control the flow of execution in your AI applications. With Flows, you can easily design and implement multi-step processes that leverage the full potential of CrewAI's capabilities.
|
||||||
|
|
||||||
|
1. **Simplified Workflow Creation**: Easily chain together multiple Crews and tasks to create complex AI workflows.
|
||||||
|
|
||||||
|
2. **State Management**: Flows make it super easy to manage and share state between different tasks in your workflow.
|
||||||
|
|
||||||
|
3. **Event-Driven Architecture**: Built on an event-driven model, allowing for dynamic and responsive workflows.
|
||||||
|
|
||||||
|
4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.
|
||||||
|
|
||||||
|
##
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
|
|||||||
|
|
||||||
def start(condition=None):
|
def start(condition=None):
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
print(f"[start decorator] Decorating start method: {func.__name__}")
|
|
||||||
func.__is_start_method__ = True
|
func.__is_start_method__ = True
|
||||||
if condition is not None:
|
if condition is not None:
|
||||||
if isinstance(condition, str):
|
if isinstance(condition, str):
|
||||||
@@ -64,9 +63,6 @@ def listen(condition):
|
|||||||
|
|
||||||
def router(method):
|
def router(method):
|
||||||
def decorator(func):
|
def decorator(func):
|
||||||
print(
|
|
||||||
f"[router decorator] Decorating router: {func.__name__} for method: {method.__name__}"
|
|
||||||
)
|
|
||||||
func.__is_router__ = True
|
func.__is_router__ = True
|
||||||
func.__router_for__ = method.__name__
|
func.__router_for__ = method.__name__
|
||||||
return func
|
return func
|
||||||
@@ -144,7 +140,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
return _FlowGeneric
|
return _FlowGeneric
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
print("[Flow.__init__] Initializing Flow")
|
|
||||||
self._methods: Dict[str, Callable] = {}
|
self._methods: Dict[str, Callable] = {}
|
||||||
self._state = self._create_initial_state()
|
self._state = self._create_initial_state()
|
||||||
self._completed_methods: Set[str] = set()
|
self._completed_methods: Set[str] = set()
|
||||||
@@ -157,7 +152,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
self._methods[method_name] = getattr(self, method_name)
|
self._methods[method_name] = getattr(self, method_name)
|
||||||
|
|
||||||
def _create_initial_state(self) -> T:
|
def _create_initial_state(self) -> T:
|
||||||
print("[Flow._create_initial_state] Creating initial state")
|
|
||||||
if self.initial_state is None and hasattr(self, "_initial_state_T"):
|
if self.initial_state is None and hasattr(self, "_initial_state_T"):
|
||||||
return self._initial_state_T() # type: ignore
|
return self._initial_state_T() # type: ignore
|
||||||
if self.initial_state is None:
|
if self.initial_state is None:
|
||||||
@@ -172,26 +166,20 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
return self._state
|
return self._state
|
||||||
|
|
||||||
async def kickoff(self):
|
async def kickoff(self):
|
||||||
print("[Flow.kickoff] Starting kickoff")
|
|
||||||
if not self._start_methods:
|
if not self._start_methods:
|
||||||
raise ValueError("No start method defined")
|
raise ValueError("No start method defined")
|
||||||
|
|
||||||
for start_method in self._start_methods:
|
for start_method in self._start_methods:
|
||||||
print(f"[Flow.kickoff] Executing start method: {start_method}")
|
|
||||||
result = await self._execute_method(self._methods[start_method])
|
result = await self._execute_method(self._methods[start_method])
|
||||||
await self._execute_listeners(start_method, result)
|
await self._execute_listeners(start_method, result)
|
||||||
|
|
||||||
async def _execute_method(self, method: Callable, *args, **kwargs):
|
async def _execute_method(self, method: Callable, *args, **kwargs):
|
||||||
print(f"[Flow._execute_method] Executing method: {method.__name__}")
|
if asyncio.iscoroutinefunction(method):
|
||||||
if inspect.iscoroutinefunction(method):
|
|
||||||
return await method(*args, **kwargs)
|
return await method(*args, **kwargs)
|
||||||
else:
|
else:
|
||||||
return method(*args, **kwargs)
|
return method(*args, **kwargs)
|
||||||
|
|
||||||
async def _execute_listeners(self, trigger_method: str, result: Any):
|
async def _execute_listeners(self, trigger_method: str, result: Any):
|
||||||
print(
|
|
||||||
f"[Flow._execute_listeners] Executing listeners for trigger method: {trigger_method}"
|
|
||||||
)
|
|
||||||
listener_tasks = []
|
listener_tasks = []
|
||||||
|
|
||||||
if trigger_method in self._routers:
|
if trigger_method in self._routers:
|
||||||
@@ -220,14 +208,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
await asyncio.gather(*listener_tasks)
|
await asyncio.gather(*listener_tasks)
|
||||||
|
|
||||||
async def _execute_single_listener(self, listener: str, result: Any):
|
async def _execute_single_listener(self, listener: str, result: Any):
|
||||||
print(f"[Flow._execute_single_listener] Executing listener: {listener}")
|
|
||||||
try:
|
try:
|
||||||
method = self._methods[listener]
|
method = self._methods[listener]
|
||||||
sig = inspect.signature(method)
|
sig = inspect.signature(method)
|
||||||
if len(sig.parameters) > 1: # More than just 'self'
|
params = list(sig.parameters.values())
|
||||||
|
|
||||||
|
# Exclude 'self' parameter
|
||||||
|
method_params = [p for p in params if p.name != "self"]
|
||||||
|
|
||||||
|
if method_params:
|
||||||
|
# If listener expects parameters, pass the result
|
||||||
listener_result = await self._execute_method(method, result)
|
listener_result = await self._execute_method(method, result)
|
||||||
else:
|
else:
|
||||||
|
# If listener does not expect parameters, call without arguments
|
||||||
listener_result = await self._execute_method(method)
|
listener_result = await self._execute_method(method)
|
||||||
|
|
||||||
|
# Execute listeners of this listener
|
||||||
await self._execute_listeners(listener, listener_result)
|
await self._execute_listeners(listener, listener_result)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[Flow._execute_single_listener] Error in method {listener}: {e}")
|
print(f"[Flow._execute_single_listener] Error in method {listener}: {e}")
|
||||||
|
|||||||
41
src/crewai/flow/super_simple_ai_flow.py
Normal file
41
src/crewai/flow/super_simple_ai_flow.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from crewai.flow.flow import Flow, listen, start
|
||||||
|
from openai import OpenAI
|
||||||
|
|
||||||
|
|
||||||
|
class ExampleFlow(Flow):
|
||||||
|
|
||||||
|
@start()
|
||||||
|
def start_method(self):
|
||||||
|
print("Starting flow")
|
||||||
|
client = OpenAI()
|
||||||
|
|
||||||
|
response = client.chat.completions.create(
|
||||||
|
model="gpt-4o-mini",
|
||||||
|
messages=[
|
||||||
|
{"role": "system", "content": "You are a helpful assistant."},
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": "Return the name of a random city in the world.",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
random_city = response.choices[0].message.content
|
||||||
|
print("random_city", random_city)
|
||||||
|
|
||||||
|
return random_city
|
||||||
|
|
||||||
|
@listen(start_method)
|
||||||
|
def second_method(self, result):
|
||||||
|
print("Second method received:", result)
|
||||||
|
# print("Second city", result)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
flow = ExampleFlow()
|
||||||
|
await flow.kickoff()
|
||||||
|
|
||||||
|
|
||||||
|
asyncio.run(main())
|
||||||
Reference in New Issue
Block a user