Skip to content

Commit

Permalink
Cleanup and minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Sep 13, 2024
1 parent 944dac6 commit 912b9f2
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ protected final Table redefine(TableDefinition newDefinitionExternal, TableDefin
reference, null, viewColumns, null);
}

private static final String LOCATION_KEY_COLUMN_NAME = "__PartitionAwareSourceTable_TableLocationKey__";
private static final String TRACKED_KEY_COLUMN_NAME = "__PartitionAwareSourceTable_TrackedTableLocationKey__";

private static <T> ColumnSource<? super T> makePartitionSource(@NotNull final ColumnDefinition<T> columnDefinition,
Expand Down Expand Up @@ -236,15 +235,13 @@ protected final Collection<TrackedTableLocationKey> filterLocationKeys(
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
final List<String> partitionTableColumnNames = Stream.concat(
partitioningColumnDefinitions.keySet().stream(),
Stream.of(LOCATION_KEY_COLUMN_NAME, TRACKED_KEY_COLUMN_NAME)).collect(Collectors.toList());
Stream.of(TRACKED_KEY_COLUMN_NAME)).collect(Collectors.toList());
final List<ColumnSource<?>> partitionTableColumnSources =
new ArrayList<>(partitioningColumnDefinitions.size() + 1);
for (final ColumnDefinition<?> columnDefinition : partitioningColumnDefinitions.values()) {
partitionTableColumnSources.add(makePartitionSource(columnDefinition, immutableTableLocationKeys));
}
// Add the tracked and immutable keys to the table
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(immutableTableLocationKeys,
ImmutableTableLocationKey.class, null));
// Add the tracked keys to the table
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(foundLocationKeys,
TrackedTableLocationKey.class, null));

Expand All @@ -255,7 +252,7 @@ protected final Collection<TrackedTableLocationKey> filterLocationKeys(
return foundLocationKeys;
}

// Return the filtered tracked location keys
// Return the filtered keys
final Iterable<TrackedTableLocationKey> iterable =
() -> filteredColumnPartitionTable.columnIterator(TRACKED_KEY_COLUMN_NAME);
return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,7 @@ private void maybeAddLocations(@NotNull final Collection<TrackedTableLocationKey
}
filterLocationKeys(locationKeys)
.parallelStream()
.forEach(lk -> {
// Unconditionally manage all locations added to the column source manager
final TableLocation tableLocation = locationProvider.getTableLocation(lk.getKey());
columnSourceManager.manage(tableLocation);
columnSourceManager.addLocation(tableLocation);
});
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk.getKey())));
}

private TrackedTableLocationKey[] maybeRemoveLocations(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ protected void destroy() {
AbstractTableLocationProvider.this.destroy();
}
};
// TODO: understand why this seems to be needed
LivenessScopeStack.peek().manage(livenessNode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public final class SingleTableLocationProvider implements TableLocationProvider
public SingleTableLocationProvider(@NotNull final TableLocation tableLocation) {
this.tableLocation = tableLocation;
trackedTableLocationKey = new TrackedTableLocationKey(tableLocation.getKey(), ttlk -> {
// no-op
// TODO: I don't think we need to do anything here, but need to think more about it
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ public synchronized void addLocation(@NotNull final TableLocation tableLocation)
if (log.isDebugEnabled()) {
log.debug().append("LOCATION_ADDED:").append(tableLocation.toString()).endl();
}
// Hold on to this table location.
manage(tableLocation);
emptyTableLocations.add(new EmptyTableLocationEntry(tableLocation));
} else {
// Duplicate location - not allowed
Expand Down

0 comments on commit 912b9f2

Please sign in to comment.