diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index a4e839073..690b38fe2 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -16,9 +16,6 @@ def start(condition=None): print(f"[start decorator] Decorating start method: {func.__name__}") func.__is_start_method__ = True if condition is not None: - print( - f"[start decorator] Adding condition: {condition} to start method: {func.__name__}" - ) if isinstance(condition, str): func.__trigger_methods__ = [condition] func.__condition_type__ = "OR" @@ -43,15 +40,9 @@ def start(condition=None): def listen(condition): def decorator(func): - print( - f"[listen decorator] Decorating listener: {func.__name__} with condition: {condition}" - ) if isinstance(condition, str): func.__trigger_methods__ = [condition] func.__condition_type__ = "OR" - print( - f"[listen decorator] Set __trigger_methods__ for {func.__name__}: [{condition}] with mode: OR" - ) elif ( isinstance(condition, dict) and "type" in condition @@ -59,15 +50,9 @@ def listen(condition): ): func.__trigger_methods__ = condition["methods"] func.__condition_type__ = condition["type"] - print( - f"[listen decorator] Set __trigger_methods__ for {func.__name__}: {func.__trigger_methods__} with mode: {func.__condition_type__}" - ) elif callable(condition) and hasattr(condition, "__name__"): func.__trigger_methods__ = [condition.__name__] func.__condition_type__ = "OR" - print( - f"[listen decorator] Set __trigger_methods__ for {func.__name__}: [{condition.__name__}] with mode: OR" - ) else: raise ValueError( "Condition must be a method, string, or a result of or_() or and_()" @@ -125,38 +110,20 @@ class FlowMeta(type): listeners = {} routers = {} - print(f"[FlowMeta] Processing class: {name}") for attr_name, attr_value in dct.items(): - print(f"[FlowMeta] Checking attribute: {attr_name}") if hasattr(attr_value, "__is_start_method__"): - print(f"[FlowMeta] Found start method: {attr_name}") start_methods.append(attr_name) if hasattr(attr_value, "__trigger_methods__"): methods = attr_value.__trigger_methods__ condition_type = getattr(attr_value, "__condition_type__", "OR") - print( - f"[FlowMeta] Conditions for start method {attr_name}:", methods - ) listeners[attr_name] = (condition_type, methods) elif hasattr(attr_value, "__trigger_methods__"): methods = attr_value.__trigger_methods__ condition_type = getattr(attr_value, "__condition_type__", "OR") - print(f"[FlowMeta] Conditions for listener {attr_name}:", methods) listeners[attr_name] = (condition_type, methods) elif hasattr(attr_value, "__is_router__"): - print( - f"[FlowMeta] Found router: {attr_name} for method: {attr_value.__router_for__}" - ) routers[attr_value.__router_for__] = attr_name - setattr(cls, "_start_methods", start_methods) - setattr(cls, "_listeners", listeners) - setattr(cls, "_routers", routers) - - print("[FlowMeta] ALL LISTENERS:", listeners) - print("[FlowMeta] START METHODS:", start_methods) - print("[FlowMeta] ROUTERS:", routers) - return cls @@ -177,12 +144,8 @@ class Flow(Generic[T], metaclass=FlowMeta): if callable(getattr(self, method_name)) and not method_name.startswith( "__" ): - print(f"[Flow.__init__] Adding method: {method_name}") self._methods[method_name] = getattr(self, method_name) - print("[Flow.__init__] All methods:", self._methods.keys()) - print("[Flow.__init__] Listeners:", self._listeners) - def _create_initial_state(self) -> T: print("[Flow._create_initial_state] Creating initial state") if self.initial_state is None: @@ -204,9 +167,6 @@ class Flow(Generic[T], metaclass=FlowMeta): for start_method in self._start_methods: print(f"[Flow.kickoff] Executing start method: {start_method}") result = await self._execute_method(self._methods[start_method]) - print( - f"[Flow.kickoff] Start method {start_method} completed. Executing listeners." - ) await self._execute_listeners(start_method, result) async def _execute_method(self, method: Callable, *args, **kwargs): @@ -224,21 +184,13 @@ class Flow(Generic[T], metaclass=FlowMeta): if trigger_method in self._routers: router_method = self._methods[self._routers[trigger_method]] - print(f"[Flow._execute_listeners] Executing router for {trigger_method}") path = await self._execute_method(router_method) - print(f"[Flow._execute_listeners] Router returned path: {path}") # Use the path as the new trigger method trigger_method = path for listener, (condition_type, methods) in self._listeners.items(): - print( - f"[Flow._execute_listeners] Checking listener: {listener}, condition: {condition_type}, methods: {methods}" - ) if condition_type == "OR": if trigger_method in methods: - print( - f"[Flow._execute_listeners] TRIGGERING METHOD: {listener} due to trigger: {trigger_method}" - ) listener_tasks.append( self._execute_single_listener(listener, result) ) @@ -247,18 +199,12 @@ class Flow(Generic[T], metaclass=FlowMeta): self._pending_and_listeners[listener] = set() self._pending_and_listeners[listener].add(trigger_method) if set(methods) == self._pending_and_listeners[listener]: - print( - f"[Flow._execute_listeners] All conditions met for listener: {listener}. Executing." - ) listener_tasks.append( self._execute_single_listener(listener, result) ) del self._pending_and_listeners[listener] # Run all listener tasks concurrently and wait for them to complete - print( - f"[Flow._execute_listeners] Executing {len(listener_tasks)} listener tasks" - ) await asyncio.gather(*listener_tasks) async def _execute_single_listener(self, listener: str, result: Any): @@ -267,18 +213,9 @@ class Flow(Generic[T], metaclass=FlowMeta): method = self._methods[listener] sig = inspect.signature(method) if len(sig.parameters) > 1: # More than just 'self' - print( - f"[Flow._execute_single_listener] Executing {listener} with result" - ) listener_result = await self._execute_method(method, result) else: - print( - f"[Flow._execute_single_listener] Executing {listener} without result" - ) listener_result = await self._execute_method(method) - print( - f"[Flow._execute_single_listener] {listener} completed, executing its listeners" - ) await self._execute_listeners(listener, listener_result) except Exception as e: print(f"[Flow._execute_single_listener] Error in method {listener}: {e}")