diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java index 28a03bd729e..6f00e429b61 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java @@ -8,57 +8,52 @@ import io.deephaven.io.logger.Logger; import io.deephaven.util.annotations.ScriptApi; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.jpy.PyObject; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.Objects; @ScriptApi public class PythonInputTableStatusListenerAdapter implements InputTableStatusListener { private static final Logger log = LoggerFactory.getLogger(PythonInputTableStatusListenerAdapter.class); - private final PyObject pyOnSuccessCallable; + private final PyObject pyOnSuccessCallback; private final PyObject pyOnErrorCallback; /** * Create a Python InputTable status listener. * - * @param pyOnSuccessListener The Python onSuccess callback function. - * @param pyOnErrorCallback The Python onError callback function. + * @param pyOnSuccessCallback The Python onSuccess callback function. + * @param pyOnErrorCallback The Python onError callback function. */ - private PythonInputTableStatusListenerAdapter(@Nullable PyObject pyOnSuccessListener, - @Nonnull PyObject pyOnErrorCallback) { - this.pyOnSuccessCallable = pyOnSuccessListener; + private PythonInputTableStatusListenerAdapter(@Nullable PyObject pyOnSuccessCallback, + @NotNull PyObject pyOnErrorCallback) { + this.pyOnSuccessCallback = pyOnSuccessCallback; this.pyOnErrorCallback = pyOnErrorCallback; } - public static PythonInputTableStatusListenerAdapter create(@Nullable PyObject pyOnSuccessListener, - @Nonnull PyObject pyOnErrorCallback) { - return new PythonInputTableStatusListenerAdapter(pyOnSuccessListener, + public static PythonInputTableStatusListenerAdapter create(@Nullable PyObject pyOnSuccessCallback, + @NotNull PyObject pyOnErrorCallback) { + return new PythonInputTableStatusListenerAdapter(pyOnSuccessCallback, Objects.requireNonNull(pyOnErrorCallback, "Python on_error callback cannot be None")); } @Override public void onError(Throwable originalException) { - if (!pyOnErrorCallback.isNone()) { - try { - pyOnErrorCallback.call("__call__", ExceptionUtils.getStackTrace(originalException)); - } catch (Throwable e) { - // If the Python onFailure callback fails, log the new exception - // and continue with the original exception. - log.error().append("Python on_error callback failed: ").append(e).endl(); - } - } else { - log.error().append("Python on_error callback is None: ") - .append(ExceptionUtils.getStackTrace(originalException)).endl(); + try { + pyOnErrorCallback.call("__call__", ExceptionUtils.getStackTrace(originalException)); + } catch (Throwable e) { + // If the Python onFailure callback fails, log the new exception + // and continue with the original exception. + log.error().append("Python on_error callback failed: ").append(e).endl(); } } @Override public void onSuccess() { - if (pyOnSuccessCallable != null && !pyOnSuccessCallable.isNone()) { - pyOnSuccessCallable.call("__call__"); + if (pyOnSuccessCallback != null && !pyOnSuccessCallback.isNone()) { + pyOnSuccessCallback.call("__call__"); } else { InputTableStatusListener.super.onSuccess(); } diff --git a/py/server/deephaven/table_factory.py b/py/server/deephaven/table_factory.py index 9826891e850..5c5a20d5420 100644 --- a/py/server/deephaven/table_factory.py +++ b/py/server/deephaven/table_factory.py @@ -255,8 +255,7 @@ def __init__(self, j_table: jpy.JType): def add(self, table: Table) -> None: """Synchronously writes rows from the provided table to this input table. If this is a keyed input table, - added rows with keys - that match existing rows will replace those rows. + added rows with keys that match existing rows will replace those rows. Args: table (Table): the table that provides the rows to write @@ -271,8 +270,7 @@ def add(self, table: Table) -> None: def delete(self, table: Table) -> None: """Synchronously deletes the keys contained in the provided table from this keyed input table. If this - method is called on an - append-only input table, an error will be raised. + method is called on an append-only input table, an error will be raised. Args: table (Table): the table with the keys to delete @@ -293,6 +291,9 @@ def add_async(self, table: Table, on_success: Callable[[], None] = None, will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error is not provided, a default callback function will be called that simply prints out the received exception. + Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering + is not guaranteed across threads. + Args: table (Table): the table that provides the rows to write on_success (Callable[[], None]): the success callback function, default is None @@ -326,6 +327,9 @@ def delete_async(self, table: Table, on_success: Callable[[], None] = None, will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error is not provided, a default callback function will be called that simply prints out the received exception. + Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering + is not guaranteed across threads. + Args: table (Table): the table with the keys to delete on_success (Callable[[], None]): the success callback function, default is None