diff --git a/engine/table/src/main/java/io/deephaven/engine/util/input/InputTableUpdater.java b/engine/table/src/main/java/io/deephaven/engine/util/input/InputTableUpdater.java index d006b2dadbd..271e3f312c3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/input/InputTableUpdater.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/input/InputTableUpdater.java @@ -91,9 +91,14 @@ default void validateDelete(Table tableToDelete) { /** * Write {@code newData} to this table. Added rows with keys that match existing rows will instead replace those * rows, if supported. + * + *

+ * This method will block until the add is "completed", where the definition of "completed" is implementation + * dependenent. + * *

- * This method will block until the rows are added. As a result, this method is not suitable for use from a - * {@link io.deephaven.engine.table.TableListener table listener} or any other + * For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable + * for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other * {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback * dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be * suitable to delete from another update graph if doing so does not introduce any cycles. @@ -106,6 +111,11 @@ default void validateDelete(Table tableToDelete) { /** * Write {@code newData} to this table. Added rows with keys that match existing rows replace those rows, if * supported. + * + *

+ * The callback to {@code listener} will happen when the add has "completed", where the definition of "completed" is + * implementation dependenent. It's possible that the callback happens immediately on the same thread. + * *

* This method will not block, and can be safely used from a {@link io.deephaven.engine.table.TableListener * table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification @@ -120,9 +130,14 @@ default void validateDelete(Table tableToDelete) { /** * Delete the keys contained in {@code table} from this input table. + * + *

+ * This method will block until the delete is "completed", where the definition of "completed" is implementation + * dependenent. + * *

- * This method will block until the rows are deleted. As a result, this method is not suitable for use from a - * {@link io.deephaven.engine.table.TableListener table listener} or any other + * For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable + * for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other * {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback * dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be * suitable to delete from another update graph if doing so does not introduce any cycles. @@ -137,6 +152,11 @@ default void delete(Table table) throws IOException { /** * Delete the keys contained in table from this input table. + * + *

+ * The callback to {@code listener} will happen when the delete has "completed", where the definition of "completed" + * is implementation dependenent. It's possible that the callback happens immediately on the same thread. + * *

* This method will not block, and can be safely used from a {@link io.deephaven.engine.table.TableListener * table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification diff --git a/engine/table/src/main/java/io/deephaven/stream/TableStreamPublisherImpl.java b/engine/table/src/main/java/io/deephaven/stream/TableStreamPublisherImpl.java index fb85b48eb30..3446b2828bb 100644 --- a/engine/table/src/main/java/io/deephaven/stream/TableStreamPublisherImpl.java +++ b/engine/table/src/main/java/io/deephaven/stream/TableStreamPublisherImpl.java @@ -113,14 +113,21 @@ public void add(Table newData) { } @Override - public List getKeyNames() { - return Collections.emptyList(); + public void addAsync(Table newData, InputTableStatusListener listener) { + try { + TableStreamPublisherImpl.this.add(newData); + } catch (Throwable t) { + listener.onError(t); + return; + } + listener.onSuccess(); } @Override - public void addAsync(Table newData, InputTableStatusListener listener) { - throw new UnsupportedOperationException("Table does not support async add"); + public List getKeyNames() { + return Collections.emptyList(); } + } private class FillChunks implements SnapshotFunction { diff --git a/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java index 1436d7b6af4..456afdfdac4 100644 --- a/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/inputtables/InputTableServiceGrpcImpl.java @@ -10,6 +10,7 @@ import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; +import io.deephaven.engine.util.input.InputTableStatusListener; import io.deephaven.engine.util.input.InputTableUpdater; import io.deephaven.extensions.barrage.util.GrpcUtil; import io.deephaven.internal.log.LoggerFactory; @@ -96,13 +97,18 @@ public void addTableToInputTable( } // actually add the tables contents - try { - inputTableUpdater.add(tableToAdd); - GrpcUtil.safelyComplete(responseObserver, AddTableResponse.getDefaultInstance()); - } catch (IOException ioException) { - throw Exceptions.statusRuntimeException(Code.DATA_LOSS, - "Error adding table to input table"); - } + inputTableUpdater.addAsync(tableToAdd, new InputTableStatusListener() { + @Override + public void onSuccess() { + GrpcUtil.safelyComplete(responseObserver, AddTableResponse.getDefaultInstance()); + } + + @Override + public void onError(Throwable t) { + GrpcUtil.safelyError(responseObserver, Exceptions.statusRuntimeException(Code.DATA_LOSS, + "Error adding table to input table")); + } + }); }); } } @@ -157,13 +163,18 @@ public void deleteTableFromInputTable( } // actually delete the table's contents - try { - inputTableUpdater.delete(tableToRemove); - GrpcUtil.safelyComplete(responseObserver, DeleteTableResponse.getDefaultInstance()); - } catch (IOException ioException) { - throw Exceptions.statusRuntimeException(Code.DATA_LOSS, - "Error deleting table from inputtable"); - } + inputTableUpdater.deleteAsync(tableToRemove, new InputTableStatusListener() { + @Override + public void onSuccess() { + GrpcUtil.safelyComplete(responseObserver, DeleteTableResponse.getDefaultInstance()); + } + + @Override + public void onError(Throwable t) { + GrpcUtil.safelyError(responseObserver, Exceptions.statusRuntimeException(Code.DATA_LOSS, + "Error deleting table from inputtable")); + } + }); }); } }