Skip to content

Commit

Permalink
Replace UCXX.__del__ with weakref.finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
pentschev committed Nov 11, 2024
1 parent f654ea5 commit d3578ad
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions python/distributed-ucxx/distributed_ucxx/ucxx.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,25 @@ def _close_comm(ref):
comm._closed = True


def _finalizer(endpoint: ucxx.Endpoint, resource_id: int) -> None:
"""UCXX comms object finalizer.
Attempt to close the UCXX endpoint if it's still alive, and deregister Dask
resource.
Parameters
----------
endpoint: ucx_api.UCXEndpoint
The endpoint to close.
resource_id: int
The unique ID of the resource returned by `_register_dask_resource` upon
registration.
"""
if endpoint is not None:
endpoint.abort()
_deregister_dask_resource(resource_id)


class UCXX(Comm):
"""Comm object using UCXX.
Expand Down Expand Up @@ -375,11 +394,18 @@ def __init__( # type: ignore[no-untyped-def]
else:
self._has_close_callback = False

self._resource_id = _register_dask_resource()
resource_id = _register_dask_resource()

logger.debug("UCX.__init__ %s", self)

weakref.finalize(self, _deregister_dask_resource, self._resource_id)
weakref.finalize(self, _finalizer, ep, resource_id)

def abort(self):
self._closed = True
if self._ep is not None:
self._ep.abort()
self._ep = None
_deregister_dask_resource(self._resource_id)

def __del__(self) -> None:
self.abort()
Expand Down

0 comments on commit d3578ad

Please sign in to comment.