Windows, concurrent.futures usage and control-c #533
Replies: 4 comments
-
Have you tried pressing ctrl+break? Windows works differently here. |
Beta Was this translation helpful? Give feedback.
-
ctrl+break seems to terminate the whole Python process. What I'll try is to pass a 1 second timeout to |
Beta Was this translation helpful? Give feedback.
-
Hi @agronholm, I've managed to work around this. For us, it doesn't have to be merged upstream here in anyio, but if you're willing to accept some patches, I'm very happy to create a merge requests. Basically, what we have is a sync API on top of an async library that works by submitting everything into a blocking portal. The sync API is used in a REPL, so users expect control-c to be working, and expected of course is that when a So, first, I had to replace all occurrences of def _wait_for_future(
future: Future[_T], on_cancel: Callable[[], None] | None = None
) -> _T:
"""
Wait for a `Future` to complete, and return the result. if Control-c was
pressed while waiting for the future, then the future will be cancelled.
:param on_cancel: When given, call this instead of cancelling the given
future when `KeyboardInterrupt` is pressed.
"""
try:
if sys.platform == "win32":
# On windows, we poll the future using a timeout, because
# `future.result()` can't be interrupted using control-c.
while True:
try:
return future.result(timeout=0.5) # Wait for the result.
except concurrent.futures.TimeoutError:
if future.done() and future.exception():
# Maybe the future itself contained a `TimeoutError`
# exception.
return future.result()
continue
else:
return future.result()
except KeyboardInterrupt:
# If we got a KeyboardInterrupt in the sync thread here, cancel the
# underlying async call.
if on_cancel is not None:
on_cancel()
else:
future.cancel()
raise Secondly, I adjusted the class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager):
"""
Copied from `anyio.from_thread._BlockingAsyncContextManager`.
Modified to support cancellation using control-c (also for Windows).
"""
_enter_future: Future
_exit_future: Future
_exit_event: Event
_exit_exc_info: tuple[
type[BaseException] | None, BaseException | None, TracebackType | None
] = (None, None, None)
def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal):
self._async_cm = async_cm
self._portal = portal
async def run_async_cm(self) -> bool | None:
try:
self._exit_event = Event()
value = await self._async_cm.__aenter__()
except BaseException as exc:
self._enter_future.set_exception(exc)
raise
else:
self._enter_future.set_result(value)
try:
# Wait for the sync context manager to exit.
# This next statement can raise `get_cancelled_exc_class()` if
# something went wrong in a task group in this async context
# manager.
await self._exit_event.wait()
finally:
# In case of cancellation, it could be that we end up here before
# `_BlockingAsyncContextManager.__exit__` is called, and an
# `_exit_exc_info` has been set.
result = await self._async_cm.__aexit__(*self._exit_exc_info)
return result
def __enter__(self) -> T_co:
self._enter_future = Future()
self._exit_future = self._portal.start_task_soon(self.run_async_cm)
try:
cm = _wait_for_future(
self._enter_future, on_cancel=self._exit_future.cancel
)
except KeyboardInterrupt:
# `__enter__` failed. `__exit__` won't be called anymore. Wait for
# task to complete.
try:
_wait_for_future(self._exit_future)
except concurrent.futures.CancelledError:
pass
raise
return cast(T_co, cm)
def __exit__(
self,
__exc_type: type[BaseException] | None,
__exc_value: BaseException | None,
__traceback: TracebackType | None,
) -> bool | None:
self._exit_exc_info = __exc_type, __exc_value, __traceback
self._portal.call(self._exit_event.set)
return _wait_for_future(self._exit_future) We have a few more changes in a helper on top of |
Beta Was this translation helpful? Give feedback.
-
FYI: I modified the above snippet further, so that no exceptions will escape the task spawned in the blocking portal. The following will catch any exception in the task (including class _BlockingAsyncContextManager(
Generic[T_co], AbstractContextManager # type:ignore[type-arg]
):
"""
Copied from `anyio.from_thread._BlockingAsyncContextManager`.
Modified to support cancellation using control-c (also for Windows).
See:
- https://github.com/agronholm/anyio/issues/471
- https://github.com/agronholm/anyio/issues/471
- https://github.com/agronholm/anyio/discussions/533
"""
_enter_future: Future[T_co]
_exit_future: Future[bool | None]
_exit_event: Event
_exit_exc_info: tuple[
type[BaseException] | None, BaseException | None, TracebackType | None
] = (None, None, None)
def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal):
self._async_cm = async_cm
self._portal = portal
async def run_async_cm(self) -> None:
# This code is modified, compared to upstream so that exceptions are
# never propagated within the event loop. They are always returned to
# the caller of the blocking portal (the main thread).
try:
self._exit_event = Event()
value = await self._async_cm.__aenter__()
except BaseException as exc:
self._enter_future.set_exception(exc)
else:
self._enter_future.set_result(value)
try:
# Wait for the sync context manager to exit.
# This next statement can raise `get_cancelled_exc_class()` if
# something went wrong in a task group in this async context
# manager.
await self._exit_event.wait()
finally:
# In case of cancellation, it could be that we end up here before
# `_BlockingAsyncContextManager.__exit__` is called, and an
# `_exit_exc_info` has been set.
try:
result = await self._async_cm.__aexit__(*self._exit_exc_info)
self._exit_future.set_result(result)
except BaseException as exc:
self._exit_future.set_exception(exc)
def __enter__(self) -> T_co:
self._enter_future = Future()
self._exit_future = Future()
self._portal.start_task_soon(self.run_async_cm)
try:
def cancel() -> None:
self._exit_future.cancel()
cm = _wait_for_future(self._enter_future, on_cancel=cancel)
except KeyboardInterrupt:
# `__enter__` failed. `__exit__` won't be called anymore. Wait for
# task to complete.
try:
_wait_for_future(self._exit_future)
except concurrent.futures.CancelledError:
pass
raise
return cm
def __exit__(
self,
__exc_type: type[BaseException] | None,
__exc_value: BaseException | None,
__traceback: TracebackType | None,
) -> bool | None:
self._exit_exc_info = __exc_type, __exc_value, __traceback
self._portal.call(self._exit_event.set)
return _wait_for_future(self._exit_future) |
Beta Was this translation helpful? Give feedback.
-
We are using the blocking portal to run async code in a thread. Here we use both
portal.start_task_soon
andportal.enter_async_context
. However, on Windows, in the main thread, pressing control-c while waiting for the task to complete doesn't work.The issue is possibly not in anyio itself, but in
concurrent.futures
, which is used here. Consider this snippet:On Windows, pressing control-c has no effect while waiting for the future. Unlike Windows/Mac, we don't get a
KeyboardInterrupt
. (We'd like to catch this exception for cancelling the future, which will cancel the async task.)Does anyone here know where this difference is coming from, and how we could work around it?
Beta Was this translation helpful? Give feedback.
All reactions