Skip to content

Commit

Permalink
Adds InputTableService support for blink tables (#4934)
Browse files Browse the repository at this point in the history
This adds `TablePublisher#inputTable`, which adds a very simple `InputTableUpdater` implementation to the blink table.

Also, some further simplifications in the spirit of #4923: `InputTableUpdater#getDescription`, `InputTableUpdater#getTable`, and `InputTableUpdater#canEdit` were removed due to no usages.

The documentation on `InputTableUpdater` was updated to make it more generally useful in other contexts, such as here with blink tables.

The `InputTableService` implementation is now taking advantage of the async methods on `InputTableUpdater`.

Fixes #4915
  • Loading branch information
devinrsmith committed Dec 14, 2023
1 parent 3dd2609 commit 0d8eba3
Show file tree
Hide file tree
Showing 15 changed files with 845 additions and 362 deletions.
656 changes: 389 additions & 267 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.cc

Large diffs are not rendered by default.

235 changes: 229 additions & 6 deletions cpp-client/deephaven/dhclient/proto/deephaven/proto/table.pb.h

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.util.KeyedArrayBackedInputTable;
import io.deephaven.engine.util.TableTools;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InMemoryAppendOnlyInputTable;
import io.deephaven.qst.table.InMemoryKeyBackedInputTable;
Expand All @@ -24,8 +25,10 @@
import io.deephaven.qst.table.Clock;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.TimeTable;
import io.deephaven.stream.TablePublisher;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -78,8 +81,8 @@ public final Table of(TicketTable ticketTable) {
}

@Override
public final UpdatableTable of(InputTable inputTable) {
return UpdatableTableAdapter.of(inputTable);
public final Table of(InputTable inputTable) {
return InputTableAdapter.of(inputTable);
}


Expand Down Expand Up @@ -153,10 +156,12 @@ public io.deephaven.base.clock.Clock visit(ClockSystem system) {
}
}

enum UpdatableTableAdapter implements InputTable.Visitor<UpdatableTable> {
enum InputTableAdapter implements InputTable.Visitor<Table> {
INSTANCE;

public static UpdatableTable of(InputTable inputTable) {
private static final AtomicInteger blinkTableCount = new AtomicInteger();

public static Table of(InputTable inputTable) {
return inputTable.walk(INSTANCE);
}

Expand All @@ -172,6 +177,15 @@ public UpdatableTable visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
final String[] keyColumnNames = inMemoryKeyBacked.keys().toArray(String[]::new);
return KeyedArrayBackedInputTable.make(definition, keyColumnNames);
}

@Override
public Table visit(BlinkInputTable blinkInputTable) {
final TableDefinition definition = DefinitionAdapter.of(blinkInputTable.schema());
return TablePublisher
.of(TableCreatorImpl.class.getSimpleName() + ".BLINK-" + blinkTableCount.getAndIncrement(),
definition, null, null)
.inputTable();
}
}

enum DefinitionAdapter implements TableSchema.Visitor<TableDefinition> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,6 @@ private void checkAsyncEditSafety(@NotNull final Table changeData) {
}
}

@Override
public String getDescription() {
return description;
}

void waitForSequence(long sequence) {
if (updateGraph.exclusiveLock().isHeldByCurrentThread()) {
// We're holding the lock. currentTable had better be refreshing. Wait on its UGP condition
Expand Down Expand Up @@ -346,16 +341,5 @@ private Map<String, WritableColumnSource<Object>> buildSourcesMap(int capacity,
return sources;
}

@Override
public Table getTable() {
return BaseArrayBackedInputTable.this;
}

@Override
public boolean canEdit() {
// TODO: Should we be more restrictive, or provide a mechanism for determining which users can edit this
// table beyond "they have a handle to it"?
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,14 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows will instead replace those
* rows, if supported.
*
* <p>
* This method will block until the add is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are added. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -106,6 +111,11 @@ default void validateDelete(Table tableToDelete) {
/**
* Write {@code newData} to this table. Added rows with keys that match existing rows replace those rows, if
* supported.
*
* <p>
* The callback to {@code listener} will happen when the add has "completed", where the definition of "completed" is
* implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand All @@ -120,9 +130,14 @@ default void validateDelete(Table tableToDelete) {

/**
* Delete the keys contained in {@code table} from this input table.
*
* <p>
* This method will block until the delete is "completed", where the definition of "completed" is implementation
* dependenent.
*
* <p>
* This method will block until the rows are deleted. As a result, this method is not suitable for use from a
* {@link io.deephaven.engine.table.TableListener table listener} or any other
* For implementations where "completed" means "visible in the next update graph cycle", this method is not suitable
* for use from a {@link io.deephaven.engine.table.TableListener table listener} or any other
* {@link io.deephaven.engine.updategraph.NotificationQueue.Notification notification}-dispatched callback
* dispatched by this InputTable's {@link io.deephaven.engine.updategraph.UpdateGraph update graph}. It may be
* suitable to delete from another update graph if doing so does not introduce any cycles.
Expand All @@ -137,6 +152,11 @@ default void delete(Table table) throws IOException {

/**
* Delete the keys contained in table from this input table.
*
* <p>
* The callback to {@code listener} will happen when the delete has "completed", where the definition of "completed"
* is implementation dependenent. It's possible that the callback happens immediately on the same thread.
*
* <p>
* This method will <em>not</em> block, and can be safely used from a {@link io.deephaven.engine.table.TableListener
* table listener} or any other {@link io.deephaven.engine.updategraph.NotificationQueue.Notification
Expand All @@ -151,20 +171,6 @@ default void deleteAsync(Table table, InputTableStatusListener listener) {
throw new UnsupportedOperationException("Table does not support deletes");
}

/**
* Return a user-readable description of this InputTable.
*
* @return a description of this input table
*/
String getDescription();

/**
* Returns a Deephaven table that contains the current data for this InputTable.
*
* @return the current data in this InputTable.
*/
Table getTable();

/**
* Returns true if the specified column is a key.
*
Expand All @@ -184,12 +190,4 @@ default boolean isKey(String columnName) {
default boolean hasColumn(String columnName) {
return getTableDefinition().getColumnNames().contains(columnName);
}

/**
* Queries whether this InputTable is editable in the current context.
*
* @return true if this InputTable may be edited, false otherwise TODO (deephaven/deephaven-core/issues/255): Add
* AuthContext and whatever else is appropriate
*/
boolean canEdit();
}
25 changes: 25 additions & 0 deletions engine/table/src/main/java/io/deephaven/stream/TablePublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.util.annotations.TestUseOnly;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -169,6 +171,29 @@ public boolean isAlive() {
return adapter.isAlive();
}

/**
* Creates a new {@link Table#BLINK_TABLE_ATTRIBUTE blink table} with its {@link Table#getAttribute(String)
* attribute} {@value Table#INPUT_TABLE_ATTRIBUTE} set to an {@link InputTableUpdater} implementation based on
* {@code this}. The implementation's definition of "completed" with respect to {@link InputTableUpdater#add(Table)}
* and {@link InputTableUpdater#addAsync(Table, InputTableStatusListener)} matches the semantics provided by
* {@link #add(Table)} - that is, "completed" means that a snapshot of {@code newData} has been taken and handed
* off. The implementation does not implement {@link InputTableUpdater#delete(Table)} nor
* {@link InputTableUpdater#deleteAsync(Table, InputTableStatusListener)}.
*
* <p>
* May return {@code null} if invoked more than once and the initial caller does not enforce strong reachability of
* the result.
*
* @return the input-table blink table
*/
public Table inputTable() {
final Table table = adapter.table();
if (table == null) {
return null;
}
return table.withAttributes(Map.of(Table.INPUT_TABLE_ATTRIBUTE, publisher.inputTableUpdater()));
}

@TestUseOnly
void runForUnitTests() {
adapter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.SnapshotFunction;
import io.deephaven.engine.table.impl.remote.ConstructSnapshot.State;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableArray;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
Expand Down Expand Up @@ -94,6 +97,39 @@ public void shutdown() {
}
}

public InputTableUpdater inputTableUpdater() {
return new InputTableAdapter();
}

private class InputTableAdapter implements InputTableUpdater {
@Override
public TableDefinition getTableDefinition() {
return definition;
}

@Override
public void add(Table newData) {
TableStreamPublisherImpl.this.add(newData);
}

@Override
public void addAsync(Table newData, InputTableStatusListener listener) {
try {
TableStreamPublisherImpl.this.add(newData);
} catch (Throwable t) {
listener.onError(t);
return;
}
listener.onSuccess();
}

@Override
public List<String> getKeyNames() {
return Collections.emptyList();
}

}

private class FillChunks implements SnapshotFunction {
private final Table table;
private final ColumnSource<?>[] sources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.deephaven.proto.backplane.grpc.Condition;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.Blink;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked;
import io.deephaven.proto.backplane.grpc.CrossJoinTablesRequest;
Expand Down Expand Up @@ -76,6 +77,7 @@
import io.deephaven.qst.table.AggregateAllTable;
import io.deephaven.qst.table.AggregateTable;
import io.deephaven.qst.table.AsOfJoinTable;
import io.deephaven.qst.table.BlinkInputTable;
import io.deephaven.qst.table.Clock.Visitor;
import io.deephaven.qst.table.ClockSystem;
import io.deephaven.qst.table.DropColumnsTable;
Expand Down Expand Up @@ -535,6 +537,11 @@ public InputTableKind visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return InputTableKind.newBuilder().setInMemoryKeyBacked(
InMemoryKeyBacked.newBuilder().addAllKeyColumns(inMemoryKeyBacked.keys())).build();
}

@Override
public InputTableKind visit(BlinkInputTable blinkInputTable) {
return InputTableKind.newBuilder().setBlink(Blink.getDefaultInstance()).build();
}
}));
return op(Builder::setCreateInputTable, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,9 +1210,12 @@ message CreateInputTableRequest {
message InMemoryKeyBacked {
repeated string key_columns = 1;
}
message Blink {
}
oneof kind {
InMemoryAppendOnly in_memory_append_only = 1;
InMemoryKeyBacked in_memory_key_backed = 2;
Blink blink = 3;
}
}

Expand Down
56 changes: 29 additions & 27 deletions py/client/pydeephaven/proto/table_pb2.py

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/BlinkInputTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.qst.table;

import io.deephaven.annotations.NodeStyle;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Parameter;

import java.util.UUID;

/**
* Creates a blink input-table.
*/
@Immutable
@NodeStyle
public abstract class BlinkInputTable extends InputTableBase {

public static BlinkInputTable of(TableSchema schema) {
return ImmutableBlinkInputTable.of(schema, UUID.randomUUID());
}

@Parameter
public abstract TableSchema schema();

@Parameter
abstract UUID id();

@Override
public final <R> R walk(InputTable.Visitor<R> visitor) {
return visitor.visit(this);
}
}
2 changes: 2 additions & 0 deletions qst/src/main/java/io/deephaven/qst/table/InputTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ interface Visitor<R> {
R visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly);

R visit(InMemoryKeyBackedInputTable inMemoryKeyBacked);

R visit(BlinkInputTable blinkInputTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public String visit(InMemoryAppendOnlyInputTable inMemoryAppendOnly) {
public String visit(InMemoryKeyBackedInputTable inMemoryKeyBacked) {
return "InMemoryKeyBackedInputTable(...)";
}

@Override
public String visit(BlinkInputTable blinkInputTable) {
return "BlinkInputTable(...)";
}
});
}

Expand Down
Loading

0 comments on commit 0d8eba3

Please sign in to comment.