From 110449d14bbb038db1d896b499ca130158ebeeb4 Mon Sep 17 00:00:00 2001 From: Nate Bauernfeind Date: Tue, 16 Jan 2024 10:47:40 -0700 Subject: [PATCH] BarrageTable: Use WritableRowRedirectionLockFree in Most Cases (#5038) --- .../impl/util/WritableRowRedirection.java | 2 +- .../barrage/table/BarrageBlinkTable.java | 2 +- .../barrage/table/BarrageRedirectedTable.java | 4 +++ .../barrage/table/BarrageTable.java | 27 +++++++++++++------ .../extensions/barrage/util/BarrageUtil.java | 27 +++++++++++++++---- .../PartitionedTableTypePlugin.java | 3 ++- .../server/arrow/ArrowFlightUtil.java | 4 +-- .../barrage/BarrageMessageProducer.java | 2 +- 8 files changed, 52 insertions(+), 19 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/WritableRowRedirection.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/WritableRowRedirection.java index cd293678c74..799563e5de4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/WritableRowRedirection.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/WritableRowRedirection.java @@ -171,7 +171,7 @@ default void applyShift(final RowSet tableRowSet, final RowSetShiftData shiftDat } /** - * Factory for producing WritableRowSets and their components. + * Factory for producing WritableRowRedirections and their components. */ interface Factory { TLongLongMap createUnderlyingMapWithCapacity(int initialCapacity); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java index 71425e7e234..88b88fad2c6 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java @@ -26,7 +26,6 @@ import org.jetbrains.annotations.Nullable; import java.util.ArrayDeque; -import java.util.BitSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -50,6 +49,7 @@ protected BarrageBlinkTable( final Map attributes, @Nullable final ViewportChangedCallback vpCallback) { super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback); + setFlat(); } private void processUpdate(final BarrageMessage update) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java index 858862712e7..3b7b37e3b1d 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java @@ -55,9 +55,13 @@ protected BarrageRedirectedTable(final UpdateSourceRegistrar registrar, final WritableColumnSource[] writableSources, final WritableRowRedirection rowRedirection, final Map attributes, + final boolean isFlat, @Nullable final ViewportChangedCallback vpCallback) { super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback); this.rowRedirection = rowRedirection; + if (isFlat) { + setFlat(); + } } private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateCoalescer coalescer) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index 8622c70b472..2b20f937e9b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -12,6 +12,7 @@ import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.InstrumentedTableUpdateSource; +import io.deephaven.engine.table.impl.util.*; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; @@ -22,12 +23,10 @@ import io.deephaven.engine.table.impl.sources.LongSparseArraySource; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource; -import io.deephaven.engine.table.impl.util.BarrageMessage; -import io.deephaven.engine.table.impl.util.LongColumnSourceWritableRowRedirection; -import io.deephaven.engine.table.impl.util.WritableRowRedirection; import io.deephaven.engine.updategraph.*; import io.deephaven.extensions.barrage.BarragePerformanceLog; import io.deephaven.extensions.barrage.BarrageSubscriptionPerformanceLogger; +import io.deephaven.extensions.barrage.util.BarrageUtil; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.log.LogEntry; import io.deephaven.io.log.LogLevel; @@ -46,6 +45,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Function; import java.util.function.LongConsumer; +import java.util.function.Predicate; /** * A client side {@link Table} that mirrors an upstream/server side {@code Table}. @@ -449,18 +449,29 @@ public static BarrageTable make( final BarrageTable table; - Object isBlinkTable = attributes.getOrDefault(Table.BLINK_TABLE_ATTRIBUTE, false); - if (isBlinkTable instanceof Boolean && (Boolean) isBlinkTable) { + final Predicate getAttribute = attr -> { + final Object value = attributes.getOrDefault(attr, false); + return value instanceof Boolean && (Boolean) value; + }; + + if (getAttribute.test(Table.BLINK_TABLE_ATTRIBUTE)) { final LinkedHashMap> finalColumns = makeColumns(columns, writableSources); table = new BarrageBlinkTable( registrar, queue, executor, finalColumns, writableSources, attributes, vpCallback); } else { - final WritableRowRedirection rowRedirection = - new LongColumnSourceWritableRowRedirection(new LongSparseArraySource()); + final WritableRowRedirection rowRedirection; + final boolean isFlat = getAttribute.test(BarrageUtil.TABLE_ATTRIBUTE_IS_FLAT); + if (getAttribute.test(Table.APPEND_ONLY_TABLE_ATTRIBUTE) || isFlat) { + rowRedirection = new LongColumnSourceWritableRowRedirection(new LongSparseArraySource()); + } else { + rowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(1024); + } + final LinkedHashMap> finalColumns = makeColumns(columns, writableSources, rowRedirection); table = new BarrageRedirectedTable( - registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, vpCallback); + registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, isFlat, + vpCallback); } return table; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index 5ca0e20d027..a9e82f3907c 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -99,6 +99,9 @@ public class BarrageUtil { public static final ArrowType.Timestamp NANO_SINCE_EPOCH_TYPE = new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"); + /** The name of the attribute that indicates that a table is flat. */ + public static final String TABLE_ATTRIBUTE_IS_FLAT = "IsFlat"; + private static final int ATTR_STRING_LEN_CUTOFF = 1024; private static final String ATTR_DH_PREFIX = "deephaven:"; @@ -119,14 +122,15 @@ public class BarrageUtil { Boolean.class)); public static ByteString schemaBytesFromTable(@NotNull final Table table) { - return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes()); + return schemaBytesFromTableDefinition(table.getDefinition(), table.getAttributes(), table.isFlat()); } public static ByteString schemaBytesFromTableDefinition( @NotNull final TableDefinition tableDefinition, - @NotNull final Map attributes) { + @NotNull final Map attributes, + final boolean isFlat) { return schemaBytes(fbb -> makeTableSchemaPayload( - fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes)); + fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes, isFlat)); } public static ByteString schemaBytes(@NotNull final ToIntFunction schemaPayloadWriter) { @@ -146,8 +150,9 @@ public static int makeTableSchemaPayload( @NotNull final FlatBufferBuilder builder, @NotNull final StreamReaderOptions options, @NotNull final TableDefinition tableDefinition, - @NotNull final Map attributes) { - final Map schemaMetadata = attributesToMetadata(attributes); + @NotNull final Map attributes, + final boolean isFlat) { + final Map schemaMetadata = attributesToMetadata(attributes, isFlat); final Map descriptions = GridAttributes.getColumnDescriptions(attributes); final InputTableUpdater inputTableUpdater = (InputTableUpdater) attributes.get(Table.INPUT_TABLE_ATTRIBUTE); @@ -162,7 +167,19 @@ public static int makeTableSchemaPayload( @NotNull public static Map attributesToMetadata(@NotNull final Map attributes) { + return attributesToMetadata(attributes, false); + } + + @NotNull + public static Map attributesToMetadata( + @NotNull final Map attributes, + final boolean isFlat) { final Map metadata = new HashMap<>(); + if (isFlat) { + putMetadata(metadata, ATTR_ATTR_TAG + "." + TABLE_ATTRIBUTE_IS_FLAT, "true"); + putMetadata(metadata, ATTR_ATTR_TYPE_TAG + "." + TABLE_ATTRIBUTE_IS_FLAT, + Boolean.class.getCanonicalName()); + } for (final Map.Entry entry : attributes.entrySet()) { final String key = entry.getKey(); final Object val = entry.getValue(); diff --git a/plugin/partitionedtable/src/main/java/io/deephaven/partitionedtable/PartitionedTableTypePlugin.java b/plugin/partitionedtable/src/main/java/io/deephaven/partitionedtable/PartitionedTableTypePlugin.java index 57e32281d20..ff394a8b189 100644 --- a/plugin/partitionedtable/src/main/java/io/deephaven/partitionedtable/PartitionedTableTypePlugin.java +++ b/plugin/partitionedtable/src/main/java/io/deephaven/partitionedtable/PartitionedTableTypePlugin.java @@ -38,7 +38,8 @@ public void writeCompatibleObjectTo(Exporter exporter, Object object, OutputStre // Send Schema wrapped in Message ByteString schemaWrappedInMessage = BarrageUtil.schemaBytesFromTableDefinition( partitionedTable.constituentDefinition(), - Collections.emptyMap()); + Collections.emptyMap(), + false); PartitionedTableDescriptor result = PartitionedTableDescriptor.newBuilder() .addAllKeyColumnNames(partitionedTable.keyColumnNames()) diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 73df0eead12..68e2138dcaf 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -102,7 +102,7 @@ public static void DoGetCustom( // push the schema to the listener listener.onNext(streamGeneratorFactory.getSchemaView( fbb -> BarrageUtil.makeTableSchemaPayload(fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, - table.getDefinition(), table.getAttributes()))); + table.getDefinition(), table.getAttributes(), table.isFlat()))); // shared code between `DoGet` and `BarrageSnapshotRequest` BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false, @@ -519,7 +519,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { listener.onNext(streamGeneratorFactory.getSchemaView( fbb -> BarrageUtil.makeTableSchemaPayload(fbb, snapshotOptAdapter.adapt(snapshotRequest), - table.getDefinition(), table.getAttributes()))); + table.getDefinition(), table.getAttributes(), table.isFlat()))); // collect the viewport and columnsets (if provided) final boolean hasColumns = snapshotRequest.columnsVector() != null; diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index 33b48e654dc..b341e15b48c 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -1603,7 +1603,7 @@ private void propagateSnapshotForSubscription( // Send schema metadata to this new client. subscription.listener.onNext(streamGeneratorFactory.getSchemaView( fbb -> BarrageUtil.makeTableSchemaPayload(fbb, subscription.options, - parent.getDefinition(), parent.getAttributes()))); + parent.getDefinition(), parent.getAttributes(), parent.isFlat()))); } // some messages may be empty of rows, but we need to update the client viewport and column set