Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Refactor PartitioningColumnDataIndex in support of refreshing Iceberg tables #5921

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
import gnu.trove.map.hash.TObjectIntHashMap;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderRandom;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
Expand Down Expand Up @@ -56,7 +51,7 @@ class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
/** Provides fast lookup from keys to positions in the index table **/
private final TObjectIntHashMap<Object> keyPositionMap;

private final ModifiedColumnSet upstreamLocationModified;
private final ModifiedColumnSet upstreamKeyModified;
private final ModifiedColumnSet upstreamRowSetModified;
private final ModifiedColumnSet downstreamRowSetModified;

Expand Down Expand Up @@ -113,7 +108,7 @@ class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
if (locationTable.isRefreshing()) {
// No need to track previous values; we mutate the index table's RowSets in-place, and we never move a key.
indexTable.getRowSet().writableCast().initializePreviousValue();
upstreamLocationModified = locationTable.newModifiedColumnSet(columnSourceManager.locationColumnName());
upstreamKeyModified = locationTable.newModifiedColumnSet(keyColumnName);
upstreamRowSetModified = locationTable.newModifiedColumnSet(columnSourceManager.rowSetColumnName());
downstreamRowSetModified = indexTable.newModifiedColumnSet(rowSetColumnName());
final TableUpdateListener tableListener = new BaseTable.ListenerImpl(String.format(
Expand All @@ -126,7 +121,7 @@ public void onUpdate(@NotNull final TableUpdate upstream) {
locationTable.addUpdateListener(tableListener);
manage(indexTable);
} else {
upstreamLocationModified = null;
upstreamKeyModified = null;
upstreamRowSetModified = null;
downstreamRowSetModified = null;
}
Expand All @@ -138,29 +133,30 @@ private synchronized void processUpdate(
if (upstream.empty()) {
return;
}
if (upstream.removed().isNonempty()) {
throw new UnsupportedOperationException("Removed locations are not currently supported");
}
if (upstream.shifted().nonempty()) {
throw new UnsupportedOperationException("Shifted locations are not currently supported");
}
if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamLocationModified)) {
throw new UnsupportedOperationException("Modified locations are not currently supported");
if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamKeyModified)) {
throw new UnsupportedOperationException("Modified location keys are not currently supported");
}
Assert.assertion(initializing || isRefreshing(), "initializing || isRefreshing()");

final int previousSize = keyPositionMap.size();
final RowSetBuilderRandom modifiedBuilder = initializing ? null : RowSetFactory.builderRandom();
final RowSetBuilderRandom modifiedPositionBuilder = initializing ? null : RowSetFactory.builderRandom();

if (upstream.added().isNonempty()) {
upstream.added().forAllRowKeys((final long locationRowKey) -> handleKey(
locationRowKey, false, previousSize, modifiedBuilder));
if (upstream.removed().isNonempty()) {
Assert.eqFalse(initializing, "initializing");
upstream.removed().forAllRowKeys((final long locationRowKey) -> handleLocation(
locationRowKey, ChangeType.REMOVE, modifiedPositionBuilder));
}

if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamRowSetModified)) {
Assert.eqFalse(initializing, "initializing");
upstream.modified().forAllRowKeys((final long locationRowKey) -> handleKey(
locationRowKey, true, previousSize, modifiedBuilder));
upstream.modified().forAllRowKeys((final long locationRowKey) -> handleLocation(
locationRowKey, ChangeType.MODIFY, modifiedPositionBuilder));
}
if (upstream.added().isNonempty()) {
upstream.added().forAllRowKeys((final long locationRowKey) -> handleLocation(
locationRowKey, ChangeType.ADD, modifiedPositionBuilder));
}

final int newSize = keyPositionMap.size();
Expand All @@ -172,40 +168,89 @@ private synchronized void processUpdate(
return;
}

// Send the downstream updates to any listeners of the index table
final WritableRowSet modified = modifiedBuilder.build();
final WritableRowSet modified = modifiedPositionBuilder.build();
if (previousSize == newSize && modified.isEmpty()) {
modified.close();
return;
}

final RowSetBuilderSequential removedPositionsBuilder = RowSetFactory.builderSequential();
final RowSetBuilderSequential resurrectedPositionsBuilder = RowSetFactory.builderSequential();
modified.forAllRowKeys((final long pos) -> {
final RowSet indexRowSet = indexRowSetSource.get(pos);
// noinspection DataFlowIssue
if (indexRowSet.isEmpty()) {
removedPositionsBuilder.appendKey(pos);
} else if (indexRowSet.trackingCast().prev().isEmpty()) {
resurrectedPositionsBuilder.appendKey(pos);
}
});

final WritableRowSet added = RowSetFactory.fromRange(previousSize, newSize - 1);
final RowSet removed = removedPositionsBuilder.build();
modified.remove(removed);
try (final RowSet resurrected = resurrectedPositionsBuilder.build()) {
added.insert(resurrected);
modified.remove(resurrected);
}

// Send the downstream updates to any listeners of the index table
final TableUpdate downstream = new TableUpdateImpl(
RowSetFactory.fromRange(previousSize, newSize - 1),
RowSetFactory.empty(),
added,
removed,
modified,
RowSetShiftData.EMPTY,
modified.isNonempty() ? downstreamRowSetModified : ModifiedColumnSet.EMPTY);
indexTable.notifyListeners(downstream);
}

private void handleKey(
private enum ChangeType {
// @formatter:off
ADD("Added"),
REMOVE("Removed"),
MODIFY("Modified");
// @formatter:on

private final String actionLabel;

ChangeType(@NotNull final String actionLabel) {
this.actionLabel = actionLabel;
}
}

private void handleLocation(
final long locationRowKey,
final boolean isModify,
final int previousSize,
@Nullable final RowSetBuilderRandom modifiedBuilder) {
@NotNull final ChangeType changeType,
@Nullable final RowSetBuilderRandom modifiedPositionBuilder) {
final KEY_TYPE locationKey = locationTableKeySource.get(locationRowKey);
final Object locationKeyReinterpreted = locationTableKeySourceReinterpreted.get(locationRowKey);
final RowSet regionRowSet = locationTableRowSetSource.get(locationRowKey);
if (regionRowSet == null) {
throw new IllegalStateException(String.format("Null row set found at location index %d", locationRowKey));

final RowSet currentRegionRowSet = changeType == ChangeType.REMOVE
? null
: locationTableRowSetSource.get(locationRowKey);
final RowSet previousRegionRowSet = changeType == ChangeType.ADD
? null
: locationTableRowSetSource.getPrev(locationRowKey);

if (changeType != ChangeType.REMOVE && (currentRegionRowSet == null || currentRegionRowSet.isEmpty())) {
throw new IllegalStateException(String.format(
"%s partition (index=%d, key=%s): Unexpected null or empty current row set",
changeType.actionLabel, locationRowKey, locationKey));
}
if (changeType != ChangeType.ADD && (previousRegionRowSet == null || previousRegionRowSet.isEmpty())) {
throw new IllegalStateException(String.format(
"%s partition (index=%d, key=%s): Unexpected null or empty previous row set",
changeType.actionLabel, locationRowKey, locationKey));
}

final long regionFirstRowKey = RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey));
// Test using the (maybe) reinterpreted key
final int pos = keyPositionMap.get(locationKeyReinterpreted);

// Inserting a new bucket
if (pos == KEY_NOT_FOUND) {
if (isModify) {
throw new IllegalStateException(String.format("Modified partition key %s not found", locationKey));
if (changeType == ChangeType.REMOVE || changeType == ChangeType.MODIFY) {
throw new IllegalStateException(String.format("%s partition (index=%d, key=%s): Key not found",
changeType.actionLabel, locationRowKey, locationKey));
}
final int addedKeyPos = keyPositionMap.size();
// Store the (maybe) reinterpreted key in the lookup hashmap.
Expand All @@ -216,21 +261,27 @@ private void handleKey(
indexKeySource.set(addedKeyPos, locationKey);

indexRowSetSource.ensureCapacity(addedKeyPos + 1);
indexRowSetSource.set(addedKeyPos, regionRowSet.shift(regionFirstRowKey));
} else {
// noinspection DataFlowIssue
final WritableRowSet existingRowSet = indexRowSetSource.get(pos).writableCast();
try (final WritableRowSet shiftedRowSet = regionRowSet.shift(regionFirstRowKey)) {
// We could assert that:
// 1. an added location is non-overlapping with the key's existing row set
// 2. a modified location's current row set is a superset of its previous row set
// 3. a modified location's previous row set is a subset of the key's existing row set
existingRowSet.insert(shiftedRowSet);
}
indexRowSetSource.set(addedKeyPos, currentRegionRowSet.copy().toTracking());
return;
}

if (modifiedBuilder != null && pos < previousSize) {
modifiedBuilder.addKey(pos);
}
// Updating an existing bucket
// noinspection DataFlowIssue
final WritableRowSet existingRowSet = indexRowSetSource.get(pos).writableCast();
// We _could_ assert that:
// 1. An added location is non-overlapping with the key's existing row set
// 2. A modified location's current row set is a superset of its previous row set (with existing RCSM)
// 3. A removed or modified location's previous row set is a subset of the key's existing row set
if (previousRegionRowSet != null) {
existingRowSet.remove(previousRegionRowSet);
}
if (currentRegionRowSet != null) {
existingRowSet.insert(currentRegionRowSet);
}
if (modifiedPositionBuilder != null) {
// Note that once done processing everything, we're going to adjust this to pull out transitions _from_
// empty as adds and _to_ empty as removes.
modifiedPositionBuilder.addKey(pos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK;
import static io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource.getFirstRowKey;

/**
* Manage column sources made up of regions in their own row key address space.
*/
Expand Down Expand Up @@ -291,11 +294,10 @@ private WritableRowSet update(final boolean initializing) {
if (entry.pollUpdates(addedRowSetBuilder)) {
// Changes were detected, update the row set in the table and mark the row/column as modified.
/*
* Since TableLocationState.getRowSet() returns a copy(), we should consider adding an UpdateCommitter
* to close() the previous row sets for modified locations. This is not important for current
* implementations, since they always allocate new, flat RowSets.
* We should consider adding an UpdateCommitter to close() the previous row sets for modified locations.
* This is not important for current implementations, since they always allocate new, flat RowSets.
*/
rowSetSource.set(entry.regionIndex, entry.location.getRowSet());
rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate.shift(getFirstRowKey(entry.regionIndex)));
if (modifiedRegionBuilder != null) {
modifiedRegionBuilder.appendKey(entry.regionIndex);
}
Expand Down Expand Up @@ -346,7 +348,7 @@ private WritableRowSet update(final boolean initializing) {
wcs.set(entry.regionIndex, entry.location.getKey().getPartitionValue(key)));
// @formatter:on
locationSource.set(entry.regionIndex, entry.location);
rowSetSource.set(entry.regionIndex, entry.location.getRowSet());
rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate.shift(getFirstRowKey(entry.regionIndex)));
});
}

Expand Down Expand Up @@ -496,14 +498,14 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
Assert.neqNull(initialRowSet, "initialRowSet");
Assert.eqTrue(initialRowSet.isNonempty(), "initialRowSet.isNonempty()");
Assert.eqNull(rowSetAtLastUpdate, "rowSetAtLastUpdate");
if (initialRowSet.lastRowKey() > RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
if (initialRowSet.lastRowKey() > ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
throw new TableDataException(String.format(
"Location %s has initial last key %#016X, larger than maximum supported key %#016X",
location, initialRowSet.lastRowKey(),
RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
}

final long regionFirstKey = RegionedColumnSource.getFirstRowKey(regionIndex);
final long regionFirstKey = getFirstRowKey(regionIndex);
initialRowSet.forAllRowKeyRanges((subRegionFirstKey, subRegionLastKey) -> addedRowSetBuilder
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));

Expand Down Expand Up @@ -556,11 +558,11 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
// Nothing to do
return false;
}
if (updateRowSet.lastRowKey() > RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
if (updateRowSet.lastRowKey() > ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
throw new TableDataException(String.format(
"Location %s has updated last key %#016X, larger than maximum supported key %#016X",
location, updateRowSet.lastRowKey(),
RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
}

if (log.isDebugEnabled()) {
Expand All @@ -569,7 +571,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
.append(",TO:").append(updateRowSet.size()).endl();
}
try (final RowSet addedRowSet = updateRowSet.minus(rowSetAtLastUpdate)) {
final long regionFirstKey = RegionedColumnSource.getFirstRowKey(regionIndex);
final long regionFirstKey = getFirstRowKey(regionIndex);
addedRowSet.forAllRowKeyRanges((subRegionFirstKey, subRegionLastKey) -> addedRowSetBuilder
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));
}
Expand Down
Loading