diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java index ae95d86b8ee..1fc5e540b88 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java @@ -6,12 +6,7 @@ import gnu.trove.map.hash.TObjectIntHashMap; import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.Require; -import io.deephaven.engine.rowset.RowSequence; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetBuilderRandom; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.rowset.RowSetShiftData; -import io.deephaven.engine.rowset.WritableRowSet; +import io.deephaven.engine.rowset.*; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.Table; @@ -56,7 +51,7 @@ class PartitioningColumnDataIndex extends AbstractDataIndex { /** Provides fast lookup from keys to positions in the index table **/ private final TObjectIntHashMap keyPositionMap; - private final ModifiedColumnSet upstreamLocationModified; + private final ModifiedColumnSet upstreamKeyModified; private final ModifiedColumnSet upstreamRowSetModified; private final ModifiedColumnSet downstreamRowSetModified; @@ -65,7 +60,7 @@ class PartitioningColumnDataIndex extends AbstractDataIndex { * {@link RegionedColumnSourceManager} at a time when there cannot be any concurrent "refresh" behavior, and so we * can safely use the {@link RegionedColumnSourceManager#locationTable() location table} without snapshotting or * considering previous values. - * + * * @param keyColumnName The key column name * @param keySource The key source in the indexed table * @param columnSourceManager The column source manager that provides locations and region indexes @@ -113,7 +108,7 @@ class PartitioningColumnDataIndex extends AbstractDataIndex { if (locationTable.isRefreshing()) { // No need to track previous values; we mutate the index table's RowSets in-place, and we never move a key. indexTable.getRowSet().writableCast().initializePreviousValue(); - upstreamLocationModified = locationTable.newModifiedColumnSet(columnSourceManager.locationColumnName()); + upstreamKeyModified = locationTable.newModifiedColumnSet(keyColumnName); upstreamRowSetModified = locationTable.newModifiedColumnSet(columnSourceManager.rowSetColumnName()); downstreamRowSetModified = indexTable.newModifiedColumnSet(rowSetColumnName()); final TableUpdateListener tableListener = new BaseTable.ListenerImpl(String.format( @@ -126,7 +121,7 @@ public void onUpdate(@NotNull final TableUpdate upstream) { locationTable.addUpdateListener(tableListener); manage(indexTable); } else { - upstreamLocationModified = null; + upstreamKeyModified = null; upstreamRowSetModified = null; downstreamRowSetModified = null; } @@ -138,29 +133,30 @@ private synchronized void processUpdate( if (upstream.empty()) { return; } - if (upstream.removed().isNonempty()) { - throw new UnsupportedOperationException("Removed locations are not currently supported"); - } if (upstream.shifted().nonempty()) { throw new UnsupportedOperationException("Shifted locations are not currently supported"); } - if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamLocationModified)) { - throw new UnsupportedOperationException("Modified locations are not currently supported"); + if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamKeyModified)) { + throw new UnsupportedOperationException("Modified location keys are not currently supported"); } Assert.assertion(initializing || isRefreshing(), "initializing || isRefreshing()"); final int previousSize = keyPositionMap.size(); - final RowSetBuilderRandom modifiedBuilder = initializing ? null : RowSetFactory.builderRandom(); + final RowSetBuilderRandom modifiedPositionBuilder = initializing ? null : RowSetFactory.builderRandom(); - if (upstream.added().isNonempty()) { - upstream.added().forAllRowKeys((final long locationRowKey) -> handleKey( - locationRowKey, false, previousSize, modifiedBuilder)); + if (upstream.removed().isNonempty()) { + Assert.eqFalse(initializing, "initializing"); + upstream.removed().forAllRowKeys((final long locationRowKey) -> handleLocation( + locationRowKey, ChangeType.REMOVE, modifiedPositionBuilder)); } - if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamRowSetModified)) { Assert.eqFalse(initializing, "initializing"); - upstream.modified().forAllRowKeys((final long locationRowKey) -> handleKey( - locationRowKey, true, previousSize, modifiedBuilder)); + upstream.modified().forAllRowKeys((final long locationRowKey) -> handleLocation( + locationRowKey, ChangeType.MODIFY, modifiedPositionBuilder)); + } + if (upstream.added().isNonempty()) { + upstream.added().forAllRowKeys((final long locationRowKey) -> handleLocation( + locationRowKey, ChangeType.ADD, modifiedPositionBuilder)); } final int newSize = keyPositionMap.size(); @@ -172,40 +168,89 @@ private synchronized void processUpdate( return; } - // Send the downstream updates to any listeners of the index table - final WritableRowSet modified = modifiedBuilder.build(); + final WritableRowSet modified = modifiedPositionBuilder.build(); if (previousSize == newSize && modified.isEmpty()) { modified.close(); return; } + final RowSetBuilderSequential removedPositionsBuilder = RowSetFactory.builderSequential(); + final RowSetBuilderSequential resurrectedPositionsBuilder = RowSetFactory.builderSequential(); + modified.forAllRowKeys((final long pos) -> { + final RowSet indexRowSet = indexRowSetSource.get(pos); + // noinspection DataFlowIssue + if (indexRowSet.isEmpty()) { + removedPositionsBuilder.appendKey(pos); + } else if (indexRowSet.trackingCast().prev().isEmpty()) { + resurrectedPositionsBuilder.appendKey(pos); + } + }); + + final WritableRowSet added = RowSetFactory.fromRange(previousSize, newSize - 1); + final RowSet removed = removedPositionsBuilder.build(); + modified.remove(removed); + try (final RowSet resurrected = resurrectedPositionsBuilder.build()) { + added.insert(resurrected); + modified.remove(resurrected); + } + + // Send the downstream updates to any listeners of the index table final TableUpdate downstream = new TableUpdateImpl( - RowSetFactory.fromRange(previousSize, newSize - 1), - RowSetFactory.empty(), + added, + removed, modified, RowSetShiftData.EMPTY, modified.isNonempty() ? downstreamRowSetModified : ModifiedColumnSet.EMPTY); indexTable.notifyListeners(downstream); } - private void handleKey( + private enum ChangeType { + // @formatter:off + ADD("Added"), + REMOVE("Removed"), + MODIFY("Modified"); + // @formatter:on + + private final String actionLabel; + + ChangeType(@NotNull final String actionLabel) { + this.actionLabel = actionLabel; + } + } + + private void handleLocation( final long locationRowKey, - final boolean isModify, - final int previousSize, - @Nullable final RowSetBuilderRandom modifiedBuilder) { + @NotNull final ChangeType changeType, + @Nullable final RowSetBuilderRandom modifiedPositionBuilder) { final KEY_TYPE locationKey = locationTableKeySource.get(locationRowKey); final Object locationKeyReinterpreted = locationTableKeySourceReinterpreted.get(locationRowKey); - final RowSet regionRowSet = locationTableRowSetSource.get(locationRowKey); - if (regionRowSet == null) { - throw new IllegalStateException(String.format("Null row set found at location index %d", locationRowKey)); + + final RowSet currentRegionRowSet = changeType == ChangeType.REMOVE + ? null + : locationTableRowSetSource.get(locationRowKey); + final RowSet previousRegionRowSet = changeType == ChangeType.ADD + ? null + : locationTableRowSetSource.getPrev(locationRowKey); + + if (changeType != ChangeType.REMOVE && (currentRegionRowSet == null || currentRegionRowSet.isEmpty())) { + throw new IllegalStateException(String.format( + "%s partition (index=%d, key=%s): Unexpected null or empty current row set", + changeType.actionLabel, locationRowKey, locationKey)); + } + if (changeType != ChangeType.ADD && (previousRegionRowSet == null || previousRegionRowSet.isEmpty())) { + throw new IllegalStateException(String.format( + "%s partition (index=%d, key=%s): Unexpected null or empty previous row set", + changeType.actionLabel, locationRowKey, locationKey)); } - final long regionFirstRowKey = RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey)); // Test using the (maybe) reinterpreted key final int pos = keyPositionMap.get(locationKeyReinterpreted); + + // Inserting a new bucket if (pos == KEY_NOT_FOUND) { - if (isModify) { - throw new IllegalStateException(String.format("Modified partition key %s not found", locationKey)); + if (changeType == ChangeType.REMOVE || changeType == ChangeType.MODIFY) { + throw new IllegalStateException(String.format("%s partition (index=%d, key=%s): Key not found", + changeType.actionLabel, locationRowKey, locationKey)); } final int addedKeyPos = keyPositionMap.size(); // Store the (maybe) reinterpreted key in the lookup hashmap. @@ -216,21 +261,27 @@ private void handleKey( indexKeySource.set(addedKeyPos, locationKey); indexRowSetSource.ensureCapacity(addedKeyPos + 1); - indexRowSetSource.set(addedKeyPos, regionRowSet.shift(regionFirstRowKey).toTracking()); - } else { - // noinspection DataFlowIssue - final WritableRowSet existingRowSet = indexRowSetSource.get(pos).writableCast(); - try (final WritableRowSet shiftedRowSet = regionRowSet.shift(regionFirstRowKey)) { - // We could assert that: - // 1. an added location is non-overlapping with the key's existing row set - // 2. a modified location's current row set is a superset of its previous row set - // 3. a modified location's previous row set is a subset of the key's existing row set - existingRowSet.insert(shiftedRowSet); - } + indexRowSetSource.set(addedKeyPos, currentRegionRowSet.copy().toTracking()); + return; + } - if (modifiedBuilder != null && pos < previousSize) { - modifiedBuilder.addKey(pos); - } + // Updating an existing bucket + // noinspection DataFlowIssue + final WritableRowSet existingRowSet = indexRowSetSource.get(pos).writableCast(); + // We _could_ assert that: + // 1. An added location is non-overlapping with the key's existing row set + // 2. A modified location's current row set is a superset of its previous row set (with existing RCSM) + // 3. A removed or modified location's previous row set is a subset of the key's existing row set + if (previousRegionRowSet != null) { + existingRowSet.remove(previousRegionRowSet); + } + if (currentRegionRowSet != null) { + existingRowSet.insert(currentRegionRowSet); + } + if (modifiedPositionBuilder != null) { + // Note that once done processing everything, we're going to adjust this to pull out transitions _from_ + // empty as adds and _to_ empty as removes. + modifiedPositionBuilder.addKey(pos); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java index 274f603835a..cb71d45053f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java @@ -35,6 +35,9 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK; +import static io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource.getFirstRowKey; + /** * Manage column sources made up of regions in their own row key address space. */ @@ -369,10 +372,10 @@ private TableUpdateImpl update(final boolean initializing) { if (entry.pollUpdates(addedRowSetBuilder)) { // Changes were detected, update the row set in the table and mark the row/column as modified. /* - * Since TableLocationState.getRowSet() returns a copy(), we own entry.rowSetAtLastUpdate and can - * propagate it without making another copy(). + * We should consider adding an UpdateCommitter to close() the previous row sets for modified locations. + * This is not important for current implementations, since they always allocate new, flat RowSets. */ - rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate); + rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate.shift(getFirstRowKey(entry.regionIndex))); if (modifiedRegionBuilder != null) { modifiedRegionBuilder.appendKey(entry.regionIndex); } @@ -426,7 +429,7 @@ private TableUpdateImpl update(final boolean initializing) { wcs.set(entry.regionIndex, entry.location.getKey().getPartitionValue(key))); // @formatter:on locationSource.set(entry.regionIndex, entry.location); - rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate); + rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate.shift(getFirstRowKey(entry.regionIndex))); addedRegionBuilder.appendKey(entry.regionIndex); }); } @@ -584,14 +587,14 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi Assert.neqNull(initialRowSet, "initialRowSet"); Assert.eqTrue(initialRowSet.isNonempty(), "initialRowSet.isNonempty()"); Assert.eqNull(rowSetAtLastUpdate, "rowSetAtLastUpdate"); - if (initialRowSet.lastRowKey() > RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) { + if (initialRowSet.lastRowKey() > ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) { throw new TableDataException(String.format( "Location %s has initial last key %#016X, larger than maximum supported key %#016X", location, initialRowSet.lastRowKey(), - RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK)); + ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK)); } - final long regionFirstKey = RegionedColumnSource.getFirstRowKey(regionIndex); + final long regionFirstKey = getFirstRowKey(regionIndex); initialRowSet.forAllRowKeyRanges((subRegionFirstKey, subRegionLastKey) -> addedRowSetBuilder .appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey)); @@ -644,11 +647,11 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) { // Nothing to do return false; } - if (updateRowSet.lastRowKey() > RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) { + if (updateRowSet.lastRowKey() > ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) { throw new TableDataException(String.format( "Location %s has updated last key %#016X, larger than maximum supported key %#016X", location, updateRowSet.lastRowKey(), - RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK)); + ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK)); } if (log.isDebugEnabled()) { @@ -657,7 +660,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) { .append(",TO:").append(updateRowSet.size()).endl(); } try (final RowSet addedRowSet = updateRowSet.minus(rowSetAtLastUpdate)) { - final long regionFirstKey = RegionedColumnSource.getFirstRowKey(regionIndex); + final long regionFirstKey = getFirstRowKey(regionIndex); addedRowSet.forAllRowKeyRanges((subRegionFirstKey, subRegionLastKey) -> addedRowSetBuilder .appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey)); }