mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-05 06:59:23 +00:00
Enhance timeout handling in Flow class input requests
- Updated the `ask()` method to improve timeout management by manually managing the `ThreadPoolExecutor`, preventing potential deadlocks when the provider call exceeds the timeout duration. - Added clarifications in the documentation regarding the behavior of the timeout and the underlying request handling, ensuring better understanding for users.
This commit is contained in:
@@ -2668,6 +2668,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
message: The question or prompt to display to the user.
|
||||
timeout: Maximum seconds to wait for input. ``None`` means
|
||||
wait indefinitely. When timeout expires, returns ``None``.
|
||||
Note: timeout is best-effort for the provider call --
|
||||
``ask()`` returns ``None`` promptly, but the underlying
|
||||
``request_input()`` may continue running in a background
|
||||
thread until it completes naturally. Network providers
|
||||
should implement their own internal timeouts.
|
||||
metadata: Optional metadata to send to the input provider,
|
||||
such as user ID, channel, session context. The provider
|
||||
can use this to route the question to the right recipient.
|
||||
@@ -2723,14 +2728,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
|
||||
try:
|
||||
if timeout is not None:
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(
|
||||
provider.request_input, message, self, metadata
|
||||
)
|
||||
try:
|
||||
raw = future.result(timeout=timeout)
|
||||
except FuturesTimeoutError:
|
||||
raw = None
|
||||
# Manual executor management to avoid shutdown(wait=True)
|
||||
# deadlock when the provider call outlives the timeout.
|
||||
executor = ThreadPoolExecutor(max_workers=1)
|
||||
future = executor.submit(
|
||||
provider.request_input, message, self, metadata
|
||||
)
|
||||
try:
|
||||
raw = future.result(timeout=timeout)
|
||||
except FuturesTimeoutError:
|
||||
future.cancel()
|
||||
raw = None
|
||||
finally:
|
||||
# wait=False so we don't block if the provider is still
|
||||
# running (e.g. input() stuck waiting for user).
|
||||
# cancel_futures=True cleans up any queued-but-not-started tasks.
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
else:
|
||||
raw = provider.request_input(message, self, metadata=metadata)
|
||||
except KeyboardInterrupt:
|
||||
|
||||
Reference in New Issue
Block a user