mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-18 21:38:29 +00:00
Compare commits
3 Commits
bugfix/con
...
bugfix/flo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbe3e7cc5f | ||
|
|
876f71ade1 | ||
|
|
60d43c95b9 |
@@ -1,5 +1,3 @@
|
|||||||
# flow.py
|
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import inspect
|
import inspect
|
||||||
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
|
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
|
||||||
@@ -120,6 +118,8 @@ class FlowMeta(type):
|
|||||||
methods = attr_value.__trigger_methods__
|
methods = attr_value.__trigger_methods__
|
||||||
condition_type = getattr(attr_value, "__condition_type__", "OR")
|
condition_type = getattr(attr_value, "__condition_type__", "OR")
|
||||||
listeners[attr_name] = (condition_type, methods)
|
listeners[attr_name] = (condition_type, methods)
|
||||||
|
|
||||||
|
# TODO: should we add a check for __condition_type__ 'AND'?
|
||||||
elif hasattr(attr_value, "__is_router__"):
|
elif hasattr(attr_value, "__is_router__"):
|
||||||
routers[attr_value.__router_for__] = attr_name
|
routers[attr_value.__router_for__] = attr_name
|
||||||
possible_returns = get_possible_return_constants(attr_value)
|
possible_returns = get_possible_return_constants(attr_value)
|
||||||
@@ -159,7 +159,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._methods: Dict[str, Callable] = {}
|
self._methods: Dict[str, Callable] = {}
|
||||||
self._state: T = self._create_initial_state()
|
self._state: T = self._create_initial_state()
|
||||||
self._completed_methods: Set[str] = set()
|
self._executed_methods: Set[str] = set()
|
||||||
|
self._scheduled_tasks: Set[str] = set()
|
||||||
self._pending_and_listeners: Dict[str, Set[str]] = {}
|
self._pending_and_listeners: Dict[str, Set[str]] = {}
|
||||||
self._method_outputs: List[Any] = [] # List to store all method outputs
|
self._method_outputs: List[Any] = [] # List to store all method outputs
|
||||||
|
|
||||||
@@ -216,17 +217,24 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
else:
|
else:
|
||||||
return None # Or raise an exception if no methods were executed
|
return None # Or raise an exception if no methods were executed
|
||||||
|
|
||||||
async def _execute_start_method(self, start_method: str) -> None:
|
async def _execute_start_method(self, start_method_name: str) -> None:
|
||||||
result = await self._execute_method(self._methods[start_method])
|
result = await self._execute_method(
|
||||||
await self._execute_listeners(start_method, result)
|
start_method_name, self._methods[start_method_name]
|
||||||
|
)
|
||||||
|
await self._execute_listeners(start_method_name, result)
|
||||||
|
|
||||||
async def _execute_method(self, method: Callable, *args: Any, **kwargs: Any) -> Any:
|
async def _execute_method(
|
||||||
|
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
|
||||||
|
) -> Any:
|
||||||
result = (
|
result = (
|
||||||
await method(*args, **kwargs)
|
await method(*args, **kwargs)
|
||||||
if asyncio.iscoroutinefunction(method)
|
if asyncio.iscoroutinefunction(method)
|
||||||
else method(*args, **kwargs)
|
else method(*args, **kwargs)
|
||||||
)
|
)
|
||||||
self._method_outputs.append(result) # Store the output
|
self._method_outputs.append(result) # Store the output
|
||||||
|
|
||||||
|
self._executed_methods.add(method_name)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
|
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
|
||||||
@@ -234,32 +242,40 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
|
|
||||||
if trigger_method in self._routers:
|
if trigger_method in self._routers:
|
||||||
router_method = self._methods[self._routers[trigger_method]]
|
router_method = self._methods[self._routers[trigger_method]]
|
||||||
path = await self._execute_method(router_method)
|
path = await self._execute_method(
|
||||||
|
trigger_method, router_method
|
||||||
|
) # TODO: Change or not?
|
||||||
# Use the path as the new trigger method
|
# Use the path as the new trigger method
|
||||||
trigger_method = path
|
trigger_method = path
|
||||||
|
|
||||||
for listener, (condition_type, methods) in self._listeners.items():
|
for listener_name, (condition_type, methods) in self._listeners.items():
|
||||||
if condition_type == "OR":
|
if condition_type == "OR":
|
||||||
if trigger_method in methods:
|
if trigger_method in methods:
|
||||||
listener_tasks.append(
|
if (
|
||||||
self._execute_single_listener(listener, result)
|
listener_name not in self._executed_methods
|
||||||
)
|
and listener_name not in self._scheduled_tasks
|
||||||
|
):
|
||||||
|
self._scheduled_tasks.add(listener_name)
|
||||||
|
listener_tasks.append(
|
||||||
|
self._execute_single_listener(listener_name, result)
|
||||||
|
)
|
||||||
elif condition_type == "AND":
|
elif condition_type == "AND":
|
||||||
if listener not in self._pending_and_listeners:
|
if all(method in self._executed_methods for method in methods):
|
||||||
self._pending_and_listeners[listener] = set()
|
if (
|
||||||
self._pending_and_listeners[listener].add(trigger_method)
|
listener_name not in self._executed_methods
|
||||||
if set(methods) == self._pending_and_listeners[listener]:
|
and listener_name not in self._scheduled_tasks
|
||||||
listener_tasks.append(
|
):
|
||||||
self._execute_single_listener(listener, result)
|
self._scheduled_tasks.add(listener_name)
|
||||||
)
|
listener_tasks.append(
|
||||||
del self._pending_and_listeners[listener]
|
self._execute_single_listener(listener_name, result)
|
||||||
|
)
|
||||||
|
|
||||||
# Run all listener tasks concurrently and wait for them to complete
|
# Run all listener tasks concurrently and wait for them to complete
|
||||||
await asyncio.gather(*listener_tasks)
|
await asyncio.gather(*listener_tasks)
|
||||||
|
|
||||||
async def _execute_single_listener(self, listener: str, result: Any) -> None:
|
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
|
||||||
try:
|
try:
|
||||||
method = self._methods[listener]
|
method = self._methods[listener_name]
|
||||||
sig = inspect.signature(method)
|
sig = inspect.signature(method)
|
||||||
params = list(sig.parameters.values())
|
params = list(sig.parameters.values())
|
||||||
|
|
||||||
@@ -268,15 +284,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
|
|
||||||
if method_params:
|
if method_params:
|
||||||
# If listener expects parameters, pass the result
|
# If listener expects parameters, pass the result
|
||||||
listener_result = await self._execute_method(method, result)
|
listener_result = await self._execute_method(
|
||||||
|
listener_name, method, result
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# If listener does not expect parameters, call without arguments
|
# If listener does not expect parameters, call without arguments
|
||||||
listener_result = await self._execute_method(method)
|
listener_result = await self._execute_method(listener_name, method)
|
||||||
|
|
||||||
|
# Remove from scheduled tasks after execution
|
||||||
|
self._scheduled_tasks.discard(listener_name)
|
||||||
|
|
||||||
# Execute listeners of this listener
|
# Execute listeners of this listener
|
||||||
await self._execute_listeners(listener, listener_result)
|
await self._execute_listeners(listener_name, listener_result)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[Flow._execute_single_listener] Error in method {listener}: {e}")
|
print(
|
||||||
|
f"[Flow._execute_single_listener] Error in method {listener_name}: {e}"
|
||||||
|
)
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|||||||
Reference in New Issue
Block a user