Skip to content

Commit

Permalink
Do not create a new task to rearm worker
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev committed Oct 31, 2024
1 parent 10f7ae0 commit d719ce3
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def __init__(
weakref.finalize(self, event_loop.remove_reader, epoll_fd)
weakref.finalize(self, self.rsock.close)

self.blocking_asyncio_task = None
self.armed = False
self.blocking_asyncio_task = self.event_loop.create_task(self._arm_worker())
self.last_progress_time = time.monotonic() - self._progress_timeout
self.asyncio_task = event_loop.create_task(self._progress_with_timeout())

Expand All @@ -146,10 +147,9 @@ def _fd_reader_callback(self):
"""
self.worker.progress()

# Notice, we can safely overwrite `self.blocking_asyncio_task`
# since previous arm task is finished by now.
assert self.blocking_asyncio_task is None or self.blocking_asyncio_task.done()
self.blocking_asyncio_task = self.event_loop.create_task(self._arm_worker())
assert not self.armed

self.armed = False

async def _arm_worker(self):
"""Progress the worker and rearm.
Expand All @@ -163,6 +163,9 @@ async def _arm_worker(self):
# so that the asyncio's next state is epoll wait.
# See <https://github.com/rapidsai/ucx-py/issues/413>
while True:
if self.armed:
continue

self.last_progress_time = time.monotonic()
self.worker.progress()

Expand Down

0 comments on commit d719ce3

Please sign in to comment.