Skip to content

Commit

Permalink
Enhance Python LivenessScope API (#4497)
Browse files Browse the repository at this point in the history
The existing Python APIs for Liveness were difficult to use correctly
in cases more complex than a single script - this brings several parts
of the Java LivenessScope type to Python to enable more advanced use
cases. Rather than introduce a LivenessScopeStack, this adds an open()
method to LivenessScope itself. The liveness_scope() method will
continue to exist for simple use cases, where the scope will be closed
and released at the same time at the end of a block or function.

Also makes TableListenerHandle a JObjectWrapper so it can be more
easily unwrapped, can't accidentally be managed as a
LivePyObjectWrapper in Java.

Fixes #4471
  • Loading branch information
niloc132 committed Oct 6, 2023
1 parent 1c588df commit 03da796
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 53 deletions.
247 changes: 210 additions & 37 deletions py/server/deephaven/liveness_scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,249 @@
#

"""This module gives the users a finer degree of control over when to clean up unreferenced nodes in the query update
graph instead of solely relying on garbage collection."""
graph instead of solely relying on garbage collection.
Examples:
Use the liveness_scope() function to produce a simple liveness scope that will be used only within a `with` expression
or as a decorator.
.. code-block:: python
with liveness_scope() as scope:
ticking_table = some_ticking_source()
table = ticking_table.snapshot().join(table=other_ticking_table, on=...)
scope.preserve(table)
return table
.. code-block:: python
@liveness_scope
def get_values():
ticking_table = some_ticking_source().last_by("Sym")
return dhnp.to_numpy(ticking_table)
Use the LivenessScope type for greater control, allowing a scope to be opened more than once, and to release the
resources that it manages when the scope will no longer be used.
.. code-block:: python
def make_table_and_scope(a: int):
scope = LivenessScope()
with scope.open():
ticking_table = some_ticking_source().where(f"A={a}")
return some_ticking_table, scope
t1, s1 = make_table_and_scope(1)
# .. wait for a while
s1.release()
t2, s2 = make_table_and_scope(2)
# etc
In both cases, the scope object has a few methods that can be used to more directly manage liveness referents:
* `scope.preserve(obj)` will preserve the given instance in the next scope outside the specified scope instance
* `scope.manange(obj)` will directly manage the given instance in the specified scope. Take care not to
double-manage objects when using in conjunction with a `with` block or function decorator.
* `scope.unmanage(obj)` will stop managing the given instance. This can be used regardless of how the instance
was managed to begin with.
"""
import contextlib

import jpy

from typing import Union, Iterator
from warnings import warn

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper

_JLivenessScopeStack = jpy.get_type("io.deephaven.engine.liveness.LivenessScopeStack")
_JLivenessScope = jpy.get_type("io.deephaven.engine.liveness.LivenessScope")
_JLivenessReferent = jpy.get_type("io.deephaven.engine.liveness.LivenessReferent")


class LivenessScope(JObjectWrapper):
"""A LivenessScope automatically manages reference counting of tables and other query resources that are
created in it. It implements the context manager protocol and thus can be used in the with statement.
def _push(scope: _JLivenessScope) -> None:
_JLivenessScopeStack.push(scope)


def _pop(scope: _JLivenessScope) -> None:
try:
_JLivenessScopeStack.pop(scope)
except Exception as e:
raise DHError(e, message="failed to pop the LivenessScope from the stack.")

Note, LivenessScope should not be instantiated directly but rather through the 'liveness_scope' function.
"""
j_object_type = _JLivenessScope

def __init__(self, j_scope: jpy.JType):
self.j_scope = j_scope
def _unwrap_to_liveness_referent(referent: Union[JObjectWrapper, jpy.JType]) -> jpy.JType:
if isinstance(referent, jpy.JType) and _JLivenessReferent.jclass.isInstance(referent):
return referent
if isinstance(referent, JObjectWrapper):
return _unwrap_to_liveness_referent(referent.j_object)
raise DHError("Provided referent isn't a LivenessReferent or a JObjectWrapper around one")

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class _BaseLivenessScope(JObjectWrapper):
"""
Internal base type for Java LivenessScope types in python.
"""
j_object_type = _JLivenessScope

def __init__(self):
self.j_scope = _JLivenessScope()

def close(self):
def _release(self) -> None:
"""Closes the LivenessScope and releases all the query graph resources.
Raises:
DHError
DHError if this instance has been released too many times
"""
if self.j_scope:
try:
_JLivenessScopeStack.pop(self.j_scope)
self.j_scope.release()
self.j_scope = None
except Exception as e:
raise DHError(e, message="failed to close the LivenessScope.")

@property
def j_object(self) -> jpy.JType:
return self.j_scope
try:
self.j_scope.release()
self.j_scope = None
except Exception as e:
raise DHError(e, message="failed to close the LivenessScope.")

def preserve(self, wrapper: JObjectWrapper) -> None:
def preserve(self, referent: Union[JObjectWrapper, jpy.JType]) -> None:
"""Preserves a query graph node (usually a Table) to keep it live for the outer scope.
Args:
wrapper (JObjectWrapper): a wrapped Java object such as a Table
referent (Union[JObjectWrapper, jpy.JType]): an object to preserve in the next outer liveness scope
Raises:
DHError
DHError if the object isn't a liveness node, or this instance isn't currently at the stop of the stack
"""
referent = _unwrap_to_liveness_referent(referent)
try:
# Ensure we are the current scope, throw DHError if we aren't
_JLivenessScopeStack.pop(self.j_scope)
_JLivenessScopeStack.peek().manage(wrapper.j_object)
_JLivenessScopeStack.push(self.j_scope)
except Exception as e:
raise DHError(e, message="failed to pop the current scope - is preserve() being called on the right scope?")

try:
# Manage the object in the next outer scope on this thread.
_JLivenessScopeStack.peek().manage(_unwrap_to_liveness_referent(referent))
except Exception as e:
raise DHError(e, message="failed to preserve a wrapped object in this LivenessScope.")
finally:
# Success or failure, restore the scope that was successfully popped
_JLivenessScopeStack.push(self.j_scope)

def manage(self, referent: Union[JObjectWrapper, jpy.JType]) -> None:
"""
Explicitly manage the given java object in this scope. Must only be passed a Java LivenessReferent, or
a Python wrapper around a LivenessReferent
Args:
referent (Union[JObjectWrapper, jpy.JType]): the object to manage by this scope
Returns: None
Raises:
DHError if the referent isn't a LivenessReferent, or if it is no longer live
"""
referent = _unwrap_to_liveness_referent(referent)
try:
self.j_scope.manage(referent)
except Exception as e:
raise DHError(e, message="failed to manage object")

def unmanage(self, referent: Union[JObjectWrapper, jpy.JType]) -> None:
"""
Explicitly unmanage the given java object from this scope. Must only be passed a Java LivenessReferent, or
a Python wrapper around a LivenessReferent
Args:
referent (Union[JObjectWrapper, jpy.JType]): the object to unmanage from this scope
Returns: None
Raises:
DHError if the referent isn't a LivenessReferent, or if it is no longer live
"""
referent = _unwrap_to_liveness_referent(referent)
try:
self.j_scope.unmanage(referent)
except Exception as e:
raise DHError(e, message="failed to unmanage object")

@property
def j_object(self) -> jpy.JType:
return self.j_scope


class SimpleLivenessScope(_BaseLivenessScope):
"""
A SimpleLivenessScope automatically manages reference counting of tables and other query resources that
are created in it. Instances are created through calling `liveness_scope()` in a `with` block or as a
function decorator, and will be disposed of automatically.
"""


class LivenessScope(_BaseLivenessScope):
"""A LivenessScope automatically manages reference counting of tables and other query resources that are
created in it. Any created instances must have `release()` called on them to correctly free resources
that have been managed.
"""

def __init__(self):
super().__init__()

def liveness_scope() -> LivenessScope:
"""Creates a LivenessScope for running a block of code.
def release(self) -> None:
"""Closes the LivenessScope and releases all the managed resources.
Raises:
DHError if this instance has been released too many times
"""
self._release()

@contextlib.contextmanager
def open(self) -> Iterator[None]:
"""
Uses this scope for the duration of a `with` block, automatically managing all resources created in the block.
Returns: None, to allow changes in the future.
"""
_push(self.j_scope)
try:
yield None
finally:
_pop(self.j_scope)


def is_liveness_referent(referent: Union[JObjectWrapper, jpy.JType]) -> bool:
"""
Returns True if the provided object is a LivenessReferent, and so can be managed by a LivenessScope.
Args:
referent: the object that may be a LivenessReferent
Returns:
a LivenessScope
True if the object is a LivenessReferent, False otherwise.
"""
if isinstance(referent, jpy.JType) and _JLivenessReferent.jclass.isInstance(referent):
return True
if isinstance(referent, JObjectWrapper):
return is_liveness_referent(referent.j_object)
return False


@contextlib.contextmanager
def liveness_scope() -> Iterator[SimpleLivenessScope]:
"""Creates and opens a LivenessScope for running a block of code. Use
this function to wrap a block of code using a `with` statement.
For the duration of the `with` block, the liveness scope will be open
and any liveness referents created will be manged by it automatically.
Yields:
a SimpleLivenessScope
"""
j_scope = _JLivenessScope()
_JLivenessScopeStack.push(j_scope)
return LivenessScope(j_scope=j_scope)
scope = SimpleLivenessScope()
_push(scope.j_scope)
try:
yield scope
finally:
_pop(scope.j_scope)
scope._release()
8 changes: 6 additions & 2 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from deephaven.table import Table
from deephaven.update_graph import UpdateGraph

_JPythonListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonListenerAdapter")
_JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter")
_JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate")
_JTableUpdateDataReader = jpy.get_type("io.deephaven.integrations.python.PythonListenerTableUpdateDataReader")
Expand Down Expand Up @@ -335,8 +334,9 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str
return table_listener_handle


class TableListenerHandle:
class TableListenerHandle(JObjectWrapper):
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter

def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None):
"""Creates a new table listener handle.
Expand Down Expand Up @@ -408,3 +408,7 @@ def stop(self) -> None:
return
self.t.j_table.removeUpdateListener(self.listener)
self.started = False

@property
def j_object(self) -> jpy.JType:
return self.listener
Loading

0 comments on commit 03da796

Please sign in to comment.