diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 712c47657..d8e74fc08 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -2668,6 +2668,11 @@ class Flow(Generic[T], metaclass=FlowMeta): message: The question or prompt to display to the user. timeout: Maximum seconds to wait for input. ``None`` means wait indefinitely. When timeout expires, returns ``None``. + Note: timeout is best-effort for the provider call -- + ``ask()`` returns ``None`` promptly, but the underlying + ``request_input()`` may continue running in a background + thread until it completes naturally. Network providers + should implement their own internal timeouts. metadata: Optional metadata to send to the input provider, such as user ID, channel, session context. The provider can use this to route the question to the right recipient. @@ -2723,14 +2728,22 @@ class Flow(Generic[T], metaclass=FlowMeta): try: if timeout is not None: - with ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit( - provider.request_input, message, self, metadata - ) - try: - raw = future.result(timeout=timeout) - except FuturesTimeoutError: - raw = None + # Manual executor management to avoid shutdown(wait=True) + # deadlock when the provider call outlives the timeout. + executor = ThreadPoolExecutor(max_workers=1) + future = executor.submit( + provider.request_input, message, self, metadata + ) + try: + raw = future.result(timeout=timeout) + except FuturesTimeoutError: + future.cancel() + raw = None + finally: + # wait=False so we don't block if the provider is still + # running (e.g. input() stuck waiting for user). + # cancel_futures=True cleans up any queued-but-not-started tasks. + executor.shutdown(wait=False, cancel_futures=True) else: raw = provider.request_input(message, self, metadata=metadata) except KeyboardInterrupt: