Skip to content

Commit

Permalink
Incorporated external PR to update PartitioningColumnDataIndex for re…
Browse files Browse the repository at this point in the history
…freshing tables.
  • Loading branch information
lbooker42 committed Aug 30, 2024
1 parent 893336f commit 68e4546
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
import gnu.trove.map.hash.TObjectIntHashMap;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderRandom;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
Expand Down Expand Up @@ -56,7 +51,7 @@ class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
/** Provides fast lookup from keys to positions in the index table **/
private final TObjectIntHashMap<Object> keyPositionMap;

private final ModifiedColumnSet upstreamLocationModified;
private final ModifiedColumnSet upstreamKeyModified;
private final ModifiedColumnSet upstreamRowSetModified;
private final ModifiedColumnSet downstreamRowSetModified;

Expand All @@ -65,7 +60,7 @@ class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
* {@link RegionedColumnSourceManager} at a time when there cannot be any concurrent "refresh" behavior, and so we
* can safely use the {@link RegionedColumnSourceManager#locationTable() location table} without snapshotting or
* considering previous values.
*
*
* @param keyColumnName The key column name
* @param keySource The key source in the indexed table
* @param columnSourceManager The column source manager that provides locations and region indexes
Expand Down Expand Up @@ -113,7 +108,7 @@ class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
if (locationTable.isRefreshing()) {
// No need to track previous values; we mutate the index table's RowSets in-place, and we never move a key.
indexTable.getRowSet().writableCast().initializePreviousValue();
upstreamLocationModified = locationTable.newModifiedColumnSet(columnSourceManager.locationColumnName());
upstreamKeyModified = locationTable.newModifiedColumnSet(keyColumnName);
upstreamRowSetModified = locationTable.newModifiedColumnSet(columnSourceManager.rowSetColumnName());
downstreamRowSetModified = indexTable.newModifiedColumnSet(rowSetColumnName());
final TableUpdateListener tableListener = new BaseTable.ListenerImpl(String.format(
Expand All @@ -126,7 +121,7 @@ public void onUpdate(@NotNull final TableUpdate upstream) {
locationTable.addUpdateListener(tableListener);
manage(indexTable);
} else {
upstreamLocationModified = null;
upstreamKeyModified = null;
upstreamRowSetModified = null;
downstreamRowSetModified = null;
}
Expand All @@ -138,29 +133,30 @@ private synchronized void processUpdate(
if (upstream.empty()) {
return;
}
if (upstream.removed().isNonempty()) {
throw new UnsupportedOperationException("Removed locations are not currently supported");
}
if (upstream.shifted().nonempty()) {
throw new UnsupportedOperationException("Shifted locations are not currently supported");
}
if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamLocationModified)) {
throw new UnsupportedOperationException("Modified locations are not currently supported");
if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamKeyModified)) {
throw new UnsupportedOperationException("Modified location keys are not currently supported");
}
Assert.assertion(initializing || isRefreshing(), "initializing || isRefreshing()");

final int previousSize = keyPositionMap.size();
final RowSetBuilderRandom modifiedBuilder = initializing ? null : RowSetFactory.builderRandom();
final RowSetBuilderRandom modifiedPositionBuilder = initializing ? null : RowSetFactory.builderRandom();

if (upstream.added().isNonempty()) {
upstream.added().forAllRowKeys((final long locationRowKey) -> handleKey(
locationRowKey, false, previousSize, modifiedBuilder));
if (upstream.removed().isNonempty()) {
Assert.eqFalse(initializing, "initializing");
upstream.removed().forAllRowKeys((final long locationRowKey) -> handleLocation(
locationRowKey, ChangeType.REMOVE, modifiedPositionBuilder));
}

if (upstream.modified().isNonempty() && upstream.modifiedColumnSet().containsAny(upstreamRowSetModified)) {
Assert.eqFalse(initializing, "initializing");
upstream.modified().forAllRowKeys((final long locationRowKey) -> handleKey(
locationRowKey, true, previousSize, modifiedBuilder));
upstream.modified().forAllRowKeys((final long locationRowKey) -> handleLocation(
locationRowKey, ChangeType.MODIFY, modifiedPositionBuilder));
}
if (upstream.added().isNonempty()) {
upstream.added().forAllRowKeys((final long locationRowKey) -> handleLocation(
locationRowKey, ChangeType.ADD, modifiedPositionBuilder));
}

final int newSize = keyPositionMap.size();
Expand All @@ -172,40 +168,89 @@ private synchronized void processUpdate(
return;
}

// Send the downstream updates to any listeners of the index table
final WritableRowSet modified = modifiedBuilder.build();
final WritableRowSet modified = modifiedPositionBuilder.build();
if (previousSize == newSize && modified.isEmpty()) {
modified.close();
return;
}

final RowSetBuilderSequential removedPositionsBuilder = RowSetFactory.builderSequential();
final RowSetBuilderSequential resurrectedPositionsBuilder = RowSetFactory.builderSequential();
modified.forAllRowKeys((final long pos) -> {
final RowSet indexRowSet = indexRowSetSource.get(pos);
// noinspection DataFlowIssue
if (indexRowSet.isEmpty()) {
removedPositionsBuilder.appendKey(pos);
} else if (indexRowSet.trackingCast().prev().isEmpty()) {
resurrectedPositionsBuilder.appendKey(pos);
}
});

final WritableRowSet added = RowSetFactory.fromRange(previousSize, newSize - 1);
final RowSet removed = removedPositionsBuilder.build();
modified.remove(removed);
try (final RowSet resurrected = resurrectedPositionsBuilder.build()) {
added.insert(resurrected);
modified.remove(resurrected);
}

// Send the downstream updates to any listeners of the index table
final TableUpdate downstream = new TableUpdateImpl(
RowSetFactory.fromRange(previousSize, newSize - 1),
RowSetFactory.empty(),
added,
removed,
modified,
RowSetShiftData.EMPTY,
modified.isNonempty() ? downstreamRowSetModified : ModifiedColumnSet.EMPTY);
indexTable.notifyListeners(downstream);
}

private void handleKey(
private enum ChangeType {
// @formatter:off
ADD("Added"),
REMOVE("Removed"),
MODIFY("Modified");
// @formatter:on

private final String actionLabel;

ChangeType(@NotNull final String actionLabel) {
this.actionLabel = actionLabel;
}
}

private void handleLocation(
final long locationRowKey,
final boolean isModify,
final int previousSize,
@Nullable final RowSetBuilderRandom modifiedBuilder) {
@NotNull final ChangeType changeType,
@Nullable final RowSetBuilderRandom modifiedPositionBuilder) {
final KEY_TYPE locationKey = locationTableKeySource.get(locationRowKey);
final Object locationKeyReinterpreted = locationTableKeySourceReinterpreted.get(locationRowKey);
final RowSet regionRowSet = locationTableRowSetSource.get(locationRowKey);
if (regionRowSet == null) {
throw new IllegalStateException(String.format("Null row set found at location index %d", locationRowKey));

final RowSet currentRegionRowSet = changeType == ChangeType.REMOVE
? null
: locationTableRowSetSource.get(locationRowKey);
final RowSet previousRegionRowSet = changeType == ChangeType.ADD
? null
: locationTableRowSetSource.getPrev(locationRowKey);

if (changeType != ChangeType.REMOVE && (currentRegionRowSet == null || currentRegionRowSet.isEmpty())) {
throw new IllegalStateException(String.format(
"%s partition (index=%d, key=%s): Unexpected null or empty current row set",
changeType.actionLabel, locationRowKey, locationKey));
}
if (changeType != ChangeType.ADD && (previousRegionRowSet == null || previousRegionRowSet.isEmpty())) {
throw new IllegalStateException(String.format(
"%s partition (index=%d, key=%s): Unexpected null or empty previous row set",
changeType.actionLabel, locationRowKey, locationKey));
}

final long regionFirstRowKey = RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey));
// Test using the (maybe) reinterpreted key
final int pos = keyPositionMap.get(locationKeyReinterpreted);

// Inserting a new bucket
if (pos == KEY_NOT_FOUND) {
if (isModify) {
throw new IllegalStateException(String.format("Modified partition key %s not found", locationKey));
if (changeType == ChangeType.REMOVE || changeType == ChangeType.MODIFY) {
throw new IllegalStateException(String.format("%s partition (index=%d, key=%s): Key not found",
changeType.actionLabel, locationRowKey, locationKey));
}
final int addedKeyPos = keyPositionMap.size();
// Store the (maybe) reinterpreted key in the lookup hashmap.
Expand All @@ -216,21 +261,27 @@ private void handleKey(
indexKeySource.set(addedKeyPos, locationKey);

indexRowSetSource.ensureCapacity(addedKeyPos + 1);
indexRowSetSource.set(addedKeyPos, regionRowSet.shift(regionFirstRowKey).toTracking());
} else {
// noinspection DataFlowIssue
final WritableRowSet existingRowSet = indexRowSetSource.get(pos).writableCast();
try (final WritableRowSet shiftedRowSet = regionRowSet.shift(regionFirstRowKey)) {
// We could assert that:
// 1. an added location is non-overlapping with the key's existing row set
// 2. a modified location's current row set is a superset of its previous row set
// 3. a modified location's previous row set is a subset of the key's existing row set
existingRowSet.insert(shiftedRowSet);
}
indexRowSetSource.set(addedKeyPos, currentRegionRowSet.copy().toTracking());
return;
}

if (modifiedBuilder != null && pos < previousSize) {
modifiedBuilder.addKey(pos);
}
// Updating an existing bucket
// noinspection DataFlowIssue
final WritableRowSet existingRowSet = indexRowSetSource.get(pos).writableCast();
// We _could_ assert that:
// 1. An added location is non-overlapping with the key's existing row set
// 2. A modified location's current row set is a superset of its previous row set (with existing RCSM)
// 3. A removed or modified location's previous row set is a subset of the key's existing row set
if (previousRegionRowSet != null) {
existingRowSet.remove(previousRegionRowSet);
}
if (currentRegionRowSet != null) {
existingRowSet.insert(currentRegionRowSet);
}
if (modifiedPositionBuilder != null) {
// Note that once done processing everything, we're going to adjust this to pull out transitions _from_
// empty as adds and _to_ empty as removes.
modifiedPositionBuilder.addKey(pos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK;
import static io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource.getFirstRowKey;

/**
* Manage column sources made up of regions in their own row key address space.
*/
Expand Down Expand Up @@ -369,10 +372,10 @@ private TableUpdateImpl update(final boolean initializing) {
if (entry.pollUpdates(addedRowSetBuilder)) {
// Changes were detected, update the row set in the table and mark the row/column as modified.
/*
* Since TableLocationState.getRowSet() returns a copy(), we own entry.rowSetAtLastUpdate and can
* propagate it without making another copy().
* We should consider adding an UpdateCommitter to close() the previous row sets for modified locations.
* This is not important for current implementations, since they always allocate new, flat RowSets.
*/
rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate);
rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate.shift(getFirstRowKey(entry.regionIndex)));
if (modifiedRegionBuilder != null) {
modifiedRegionBuilder.appendKey(entry.regionIndex);
}
Expand Down Expand Up @@ -426,7 +429,7 @@ private TableUpdateImpl update(final boolean initializing) {
wcs.set(entry.regionIndex, entry.location.getKey().getPartitionValue(key)));
// @formatter:on
locationSource.set(entry.regionIndex, entry.location);
rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate);
rowSetSource.set(entry.regionIndex, entry.rowSetAtLastUpdate.shift(getFirstRowKey(entry.regionIndex)));
addedRegionBuilder.appendKey(entry.regionIndex);
});
}
Expand Down Expand Up @@ -584,14 +587,14 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
Assert.neqNull(initialRowSet, "initialRowSet");
Assert.eqTrue(initialRowSet.isNonempty(), "initialRowSet.isNonempty()");
Assert.eqNull(rowSetAtLastUpdate, "rowSetAtLastUpdate");
if (initialRowSet.lastRowKey() > RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
if (initialRowSet.lastRowKey() > ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
throw new TableDataException(String.format(
"Location %s has initial last key %#016X, larger than maximum supported key %#016X",
location, initialRowSet.lastRowKey(),
RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
}

final long regionFirstKey = RegionedColumnSource.getFirstRowKey(regionIndex);
final long regionFirstKey = getFirstRowKey(regionIndex);
initialRowSet.forAllRowKeyRanges((subRegionFirstKey, subRegionLastKey) -> addedRowSetBuilder
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));

Expand Down Expand Up @@ -644,11 +647,11 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
// Nothing to do
return false;
}
if (updateRowSet.lastRowKey() > RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
if (updateRowSet.lastRowKey() > ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK) {
throw new TableDataException(String.format(
"Location %s has updated last key %#016X, larger than maximum supported key %#016X",
location, updateRowSet.lastRowKey(),
RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK));
}

if (log.isDebugEnabled()) {
Expand All @@ -657,7 +660,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
.append(",TO:").append(updateRowSet.size()).endl();
}
try (final RowSet addedRowSet = updateRowSet.minus(rowSetAtLastUpdate)) {
final long regionFirstKey = RegionedColumnSource.getFirstRowKey(regionIndex);
final long regionFirstKey = getFirstRowKey(regionIndex);
addedRowSet.forAllRowKeyRanges((subRegionFirstKey, subRegionLastKey) -> addedRowSetBuilder
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));
}
Expand Down

0 comments on commit 68e4546

Please sign in to comment.