Skip to content

Commit

Permalink
Loosen InputTableUpdater semantics.
Browse files Browse the repository at this point in the history
Also, updates InputTableService to use async methods.
  • Loading branch information
devinrsmith committed Dec 13, 2023
1 parent c0ae2c3 commit da41393
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,14 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows will instead replace those
* rows, if supported.
*
* <p>
* This method will block until the add is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are added. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -106,6 +111,11 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows replace those rows, if
* supported.
*
* <p>
* The callback to {@code listener} will happen when the add has "completed", where the definition of "completed" is
* implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand All @@ -120,9 +130,14 @@ default void validateDelete(Table tableToDelete) {

/**
* Delete the keys contained in {@code table} from this input table.
*
* <p>
* This method will block until the delete is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are deleted. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -137,6 +152,11 @@ default void delete(Table table) throws IOException {

/**
* Delete the keys contained in table from this input table.
*
* <p>
* The callback to {@code listener} will happen when the delete has "completed", where the definition of "completed"
* is implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,21 @@ public void add(Table newData) {
}

@Override
public List<String> getKeyNames() {
return Collections.emptyList();
public void addAsync(Table newData, InputTableStatusListener listener) {
try {
TableStreamPublisherImpl.this.add(newData);
} catch (Throwable t) {
listener.onError(t);
return;
}
listener.onSuccess();
}

@Override
public void addAsync(Table newData, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support async add");
public List<String> getKeyNames() {
return Collections.emptyList();
}

}

private class FillChunks implements SnapshotFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
Expand Down Expand Up @@ -96,13 +97,18 @@ public void addTableToInputTable(
}

// actually add the tables contents
try {
inputTableUpdater.add(tableToAdd);
GrpcUtil.safelyComplete(responseObserver, AddTableResponse.getDefaultInstance());
} catch (IOException ioException) {
throw Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error adding table to input table");
}
inputTableUpdater.addAsync(tableToAdd, new InputTableStatusListener() {
@Override
public void onSuccess() {
GrpcUtil.safelyComplete(responseObserver, AddTableResponse.getDefaultInstance());
}

@Override
public void onError(Throwable t) {
GrpcUtil.safelyError(responseObserver, Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error adding table to input table"));
}
});
});
}
}
Expand Down Expand Up @@ -157,13 +163,18 @@ public void deleteTableFromInputTable(
}

// actually delete the table's contents
try {
inputTableUpdater.delete(tableToRemove);
GrpcUtil.safelyComplete(responseObserver, DeleteTableResponse.getDefaultInstance());
} catch (IOException ioException) {
throw Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error deleting table from inputtable");
}
inputTableUpdater.deleteAsync(tableToRemove, new InputTableStatusListener() {
@Override
public void onSuccess() {
GrpcUtil.safelyComplete(responseObserver, DeleteTableResponse.getDefaultInstance());
}

@Override
public void onError(Throwable t) {
GrpcUtil.safelyError(responseObserver, Exceptions.statusRuntimeException(Code.DATA_LOSS,
"Error deleting table from inputtable"));
}
});
});
}
}
Expand Down

0 comments on commit da41393

Please sign in to comment.