Skip to content

Commit

Permalink
Refactored ATLP
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Sep 18, 2024
1 parent 912b9f2 commit e2b6fd0
Show file tree
Hide file tree
Showing 19 changed files with 290 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.locations.TrackedTableLocationKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -18,7 +18,7 @@
/**
* Manager for ColumnSources in a Table.
*/
public interface ColumnSourceManager extends LivenessNode {
public interface ColumnSourceManager extends LivenessReferent {

/**
* Get a map of name to {@link ColumnSource} for the column sources maintained by this manager.
Expand Down Expand Up @@ -112,8 +112,7 @@ public interface ColumnSourceManager extends LivenessNode {
/**
* Remove a table location key from the sources.
*
* @return true if the location key was actually removed
* @param tableLocationKey the location key being removed
*/
boolean removeLocationKey(@NotNull TrackedTableLocationKey tableLocationKey);
void removeLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import io.deephaven.api.Selectable;
import io.deephaven.api.filter.Filter;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.TrackedTableLocationKey;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
Expand Down Expand Up @@ -222,14 +222,14 @@ private static <T> ColumnSource<? super T> makePartitionSource(@NotNull final Co
}

@Override
protected final Collection<TrackedTableLocationKey> filterLocationKeys(
@NotNull final Collection<TrackedTableLocationKey> foundLocationKeys) {
protected final Collection<LiveSupplier<ImmutableTableLocationKey>> filterLocationKeys(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> foundLocationKeys) {
if (partitioningColumnFilters.length == 0) {
return foundLocationKeys;
}

final Collection<ImmutableTableLocationKey> immutableTableLocationKeys = foundLocationKeys.stream()
.map(TrackedTableLocationKey::getKey)
.map(LiveSupplier::get)
.collect(Collectors.toList());

// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
Expand All @@ -242,8 +242,11 @@ protected final Collection<TrackedTableLocationKey> filterLocationKeys(
partitionTableColumnSources.add(makePartitionSource(columnDefinition, immutableTableLocationKeys));
}
// Add the tracked keys to the table
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(foundLocationKeys,
TrackedTableLocationKey.class, null));
// TODO: figure out how to do this
// partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(
// foundLocationKeys,
// LiveSupplier<ImmutableTableLocationKey>.class,
// null));

final Table filteredColumnPartitionTable = TableTools
.newTable(foundLocationKeys.size(), partitionTableColumnNames, partitionTableColumnSources)
Expand All @@ -253,7 +256,7 @@ protected final Collection<TrackedTableLocationKey> filterLocationKeys(
}

// Return the filtered keys
final Iterable<TrackedTableLocationKey> iterable =
final Iterable<LiveSupplier<ImmutableTableLocationKey>> iterable =
() -> filteredColumnPartitionTable.columnIterator(TRACKED_KEY_COLUMN_NAME);
return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.*;
Expand Down Expand Up @@ -166,7 +167,7 @@ protected void instrumentedRefresh() {

final Collection<TableLocation> locations = new ArrayList<>();
tableLocationProvider.getTableLocationKeys(tlk -> {
locations.add(tableLocationProvider.getTableLocation(tlk.getKey()));
locations.add(tableLocationProvider.getTableLocation(tlk.get()));
}, locationKeyMatcher);
try (final RowSet added = sortAndAddLocations(locations.stream())) {
resultRows.insert(added);
Expand Down Expand Up @@ -250,7 +251,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
locationUpdate.getPendingAddedLocationKeys().stream()
.map(TrackedTableLocationKey::getKey)
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.map(PendingLocationState::new)
Expand All @@ -272,7 +273,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
final Set<ImmutableTableLocationKey> relevantRemovedLocations =
locationUpdate.getPendingRemovedLocationKeys()
.stream()
.map(TrackedTableLocationKey::getKey)
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.TableDefinition;
Expand Down Expand Up @@ -148,33 +149,40 @@ private void initializeAvailableLocations() {
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
locationProvider.refresh();
final Collection<TrackedTableLocationKey> tableLocationKeys = new ArrayList<>();
locationProvider.getTableLocationKeys(tableLocationKeys::add);
maybeAddLocations(tableLocationKeys);
final Collection<LiveSupplier<ImmutableTableLocationKey>> keySuppliers = new ArrayList<>();
// Manage each of the location keys as we see them (since the TLP is not guaranteeing them outside
// the callback)
locationProvider.getTableLocationKeys(ttlk -> {
manage(ttlk);
keySuppliers.add(ttlk);
});
maybeAddLocations(keySuppliers);
// Now we can un-manage the location keys
keySuppliers.forEach(this::unmanage);
}
});
locationsInitialized = true;
}
}

private void maybeAddLocations(@NotNull final Collection<TrackedTableLocationKey> locationKeys) {
private void maybeAddLocations(@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> locationKeys) {
if (locationKeys.isEmpty()) {
return;
}
filterLocationKeys(locationKeys)
.parallelStream()
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk.getKey())));
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk.get())));
}

private TrackedTableLocationKey[] maybeRemoveLocations(
@NotNull final Collection<TrackedTableLocationKey> removedKeys) {
private void maybeRemoveLocations(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys) {
if (removedKeys.isEmpty()) {
return TrackedTableLocationKey.ZERO_LENGTH_TRACKED_TABLE_LOCATION_KEY_ARRAY;
return;
}

return filterLocationKeys(removedKeys).stream()
.filter(columnSourceManager::removeLocationKey)
.toArray(TrackedTableLocationKey[]::new);
filterLocationKeys(removedKeys).stream()
.map(LiveSupplier::get)
.forEach(columnSourceManager::removeLocationKey);
}

private void initializeLocationSizes() {
Expand Down Expand Up @@ -255,8 +263,8 @@ protected void onRefreshError(@NotNull final Exception error) {
* {@link TableLocationProvider}, but not yet incorporated into the table
* @return A sub-collection of the input
*/
protected Collection<TrackedTableLocationKey> filterLocationKeys(
@NotNull final Collection<TrackedTableLocationKey> foundLocationKeys) {
protected Collection<LiveSupplier<ImmutableTableLocationKey>> filterLocationKeys(
@NotNull final Collection<LiveSupplier<ImmutableTableLocationKey>> foundLocationKeys) {
return foundLocationKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.deephaven.api.SortColumn;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.table.BasicDataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.io.log.impl.LogOutputStringImpl;
Expand All @@ -22,7 +22,7 @@
* location allows access to columns, size, and possibly other metadata for a single partition that may be included in a
* source table.
*/
public interface TableLocation extends NamedImplementation, LogOutputAppendable, TableLocationState, LivenessNode {
public interface TableLocation extends NamedImplementation, LogOutputAppendable, TableLocationState, LivenessReferent {

/**
* Listener interface for anything that wants to know about changes to a location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ <PARTITION_VALUE_TYPE extends Comparable<PARTITION_VALUE_TYPE>> PARTITION_VALUE_
* @return An immutable version of this key
*/
ImmutableTableLocationKey makeImmutable();

/**
* Release any cached data associated with this key. This would only be called at EOL for this key.
*/
default void clear() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.table.impl.locations;

import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.util.type.NamedImplementation;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -43,24 +44,24 @@ interface Listener extends BasicTableDataListener {
void endTransaction(@NotNull Object token);

/**
* Notify the listener of a {@link TrackedTableLocationKey} encountered while initiating or maintaining the
* location subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
* guaranteed. This addition is not part of any transaction, and is equivalent to
* Notify the listener of a {@link LiveSupplier<ImmutableTableLocationKey>} encountered while initiating or
* maintaining the location subscription. This should occur at most once per location, but the order of delivery
* is <i>not</i> guaranteed. This addition is not part of any transaction, and is equivalent to
* {@code handleTableLocationKeyAdded(tableLocationKey, null);} by default.
*
* @param tableLocationKey The new table location key.
*/
void handleTableLocationKeyAdded(@NotNull TrackedTableLocationKey tableLocationKey);
void handleTableLocationKeyAdded(@NotNull LiveSupplier<ImmutableTableLocationKey> tableLocationKey);

/**
* Notify the listener of a {@link TrackedTableLocationKey} that has been removed. This removal is not part of
* any transaction, and is equivalent to {@code handleTableLocationKeyRemoved(tableLocationKey, null);} by
* default.
* Notify the listener of a {@link LiveSupplier<ImmutableTableLocationKey>} that has been removed. This removal
* is not part of any transaction, and is equivalent to
* {@code handleTableLocationKeyRemoved(tableLocationKey, null);} by default.
*
* @param tableLocationKey The table location key that was removed.
*/
@SuppressWarnings("unused")
void handleTableLocationKeyRemoved(@NotNull TrackedTableLocationKey tableLocationKey);
void handleTableLocationKeyRemoved(@NotNull LiveSupplier<ImmutableTableLocationKey> tableLocationKey);

/**
* <p>
Expand All @@ -73,8 +74,8 @@ interface Listener extends BasicTableDataListener {
* @param removedKeys Collection of table location keys that were removed.
*/
default void handleTableLocationKeysUpdate(
@NotNull Collection<TrackedTableLocationKey> addedKeys,
@NotNull Collection<TrackedTableLocationKey> removedKeys) {
@NotNull Collection<LiveSupplier<ImmutableTableLocationKey>> addedKeys,
@NotNull Collection<LiveSupplier<ImmutableTableLocationKey>> removedKeys) {
removedKeys.forEach(this::handleTableLocationKeyRemoved);
addedKeys.forEach(this::handleTableLocationKeyAdded);
}
Expand Down Expand Up @@ -131,17 +132,24 @@ default void handleTableLocationKeysUpdate(
* Get this provider's currently known location keys. The locations specified by the keys returned may have null
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
*
* @param consumer A consumer to receive the location keys
*/
default void getTableLocationKeys(Consumer<TrackedTableLocationKey> consumer) {
default void getTableLocationKeys(Consumer<LiveSupplier<ImmutableTableLocationKey>> consumer) {
getTableLocationKeys(consumer, key -> true);
}

/**
* Get this provider's currently known location keys. The locations specified by the keys returned may have null
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
*
* @param consumer A consumer to receive the location keys
* @param filter A filter to apply to the location keys before the consumer is called
*/
void getTableLocationKeys(Consumer<TrackedTableLocationKey> consumer, Predicate<ImmutableTableLocationKey> filter);
void getTableLocationKeys(
Consumer<LiveSupplier<ImmutableTableLocationKey>> consumer,
Predicate<ImmutableTableLocationKey> filter);

/**
* Check if this provider knows the supplied location key.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,8 @@ public final BasicDataIndex getDataIndex(@NotNull final String... columns) {
private void destroy() {
handleUpdate(null, System.currentTimeMillis());
clearColumnLocations();

// The key may be holding resources that can be cleared.
tableLocationKey.clear();
}
}
Loading

0 comments on commit e2b6fd0

Please sign in to comment.