Skip to content

Commit

Permalink
For review only, does not compile :(
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Sep 12, 2024
1 parent 09e2b6e commit 30910dd
Show file tree
Hide file tree
Showing 15 changed files with 413 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
//
package io.deephaven.engine.table.impl;

import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.locations.TrackedTableLocationKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -19,7 +18,7 @@
/**
* Manager for ColumnSources in a Table.
*/
public interface ColumnSourceManager extends LivenessReferent {
public interface ColumnSourceManager extends LivenessNode {

/**
* Get a map of name to {@link ColumnSource} for the column sources maintained by this manager.
Expand Down Expand Up @@ -116,5 +115,5 @@ public interface ColumnSourceManager extends LivenessReferent {
* @return true if the location key was actually removed
* @param tableLocationKey the location key being removed
*/
boolean removeLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
boolean removeLocationKey(@NotNull TrackedTableLocationKey tableLocationKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.api.filter.Filter;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.TrackedTableLocationKey;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
Expand Down Expand Up @@ -221,11 +222,12 @@ private static <T> ColumnSource<? super T> makePartitionSource(@NotNull final Co
}

@Override
protected final Collection<ImmutableTableLocationKey> filterLocationKeys(
@NotNull final Collection<ImmutableTableLocationKey> foundLocationKeys) {
protected final Collection<TrackedTableLocationKey> filterLocationKeys(
@NotNull final Collection<TrackedTableLocationKey> foundLocationKeys) {
if (partitioningColumnFilters.length == 0) {
return foundLocationKeys;
}

// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
final List<String> partitionTableColumnNames = Stream.concat(
partitioningColumnDefinitions.keySet().stream(),
Expand All @@ -243,6 +245,10 @@ protected final Collection<ImmutableTableLocationKey> filterLocationKeys(
if (filteredColumnPartitionTable.size() == foundLocationKeys.size()) {
return foundLocationKeys;
}

// TODO: Not sure what to do here. Seems like there is a big disconnect between the location keys and the
// tracked location keys.

final Iterable<ImmutableTableLocationKey> iterable =
() -> filteredColumnPartitionTable.columnIterator(LOCATION_KEY_COLUMN_NAME);
return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,14 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
}

private void processPendingLocations(final boolean notifyListeners) {
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = subscriptionBuffer.processPending();
final RowSet removed = processRemovals(locationUpdate);
final RowSet added = processAdditions(locationUpdate);
final RowSet removed;
final RowSet added;

try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
subscriptionBuffer.processPending()) {
removed = processRemovals(locationUpdate);
added = processAdditions(locationUpdate);
}

resultRows.update(added, removed);
if (notifyListeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocationProvider;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.locations.impl.TableLocationSubscriptionBuffer;
Expand Down Expand Up @@ -140,11 +138,11 @@ private void initializeAvailableLocations() {
if (isRefreshing()) {
final TableLocationSubscriptionBuffer locationBuffer =
new TableLocationSubscriptionBuffer(locationProvider);
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending();

maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
locationProvider.refresh();
Expand All @@ -155,24 +153,28 @@ private void initializeAvailableLocations() {
}
}

private void maybeAddLocations(@NotNull final Collection<ImmutableTableLocationKey> locationKeys) {
private void maybeAddLocations(@NotNull final Collection<TrackedTableLocationKey> locationKeys) {
if (locationKeys.isEmpty()) {
return;
}
filterLocationKeys(locationKeys)
.parallelStream()
.forEach(lk -> columnSourceManager.addLocation(locationProvider.getTableLocation(lk)));
.forEach(lk -> {
// Unconditionally manage all locations added to the column source manager
columnSourceManager.manage(lk);
columnSourceManager.addLocation(locationProvider.getTableLocation(lk));
});
}

private ImmutableTableLocationKey[] maybeRemoveLocations(
@NotNull final Collection<ImmutableTableLocationKey> removedKeys) {
private TrackedTableLocationKey[] maybeRemoveLocations(
@NotNull final Collection<TrackedTableLocationKey> removedKeys) {
if (removedKeys.isEmpty()) {
return ImmutableTableLocationKey.ZERO_LENGTH_IMMUTABLE_TABLE_LOCATION_KEY_ARRAY;
return TrackedTableLocationKey.ZERO_LENGTH_TRACKED_TABLE_LOCATION_KEY_ARRAY;
}

return filterLocationKeys(removedKeys).stream()
.filter(columnSourceManager::removeLocationKey)
.toArray(ImmutableTableLocationKey[]::new);
.toArray(TrackedTableLocationKey[]::new);
}

private void initializeLocationSizes() {
Expand Down Expand Up @@ -212,10 +214,10 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca

@Override
protected void instrumentedRefresh() {
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
final ImmutableTableLocationKey[] removedKeys =
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
try(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending()) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}

// This class previously had functionality to notify "location listeners", but it was never used.
// Resurrect from git history if needed.
Expand Down Expand Up @@ -252,8 +254,8 @@ protected void onRefreshError(@NotNull final Exception error) {
* {@link TableLocationProvider}, but not yet incorporated into the table
* @return A sub-collection of the input
*/
protected Collection<ImmutableTableLocationKey> filterLocationKeys(
@NotNull final Collection<ImmutableTableLocationKey> foundLocationKeys) {
protected Collection<TrackedTableLocationKey> filterLocationKeys(
@NotNull final Collection<TrackedTableLocationKey> foundLocationKeys) {
return foundLocationKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
*/
@Immutable
public interface ImmutableTableLocationKey extends TableLocationKey {

ImmutableTableLocationKey[] ZERO_LENGTH_IMMUTABLE_TABLE_LOCATION_KEY_ARRAY = new ImmutableTableLocationKey[0];

@FinalDefault
default ImmutableTableLocationKey makeImmutable() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.api.SortColumn;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.table.BasicDataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.io.log.impl.LogOutputStringImpl;
Expand All @@ -21,7 +22,7 @@
* location allows access to columns, size, and possibly other metadata for a single partition that may be included in a
* source table.
*/
public interface TableLocation extends NamedImplementation, LogOutputAppendable, TableLocationState {
public interface TableLocation extends NamedImplementation, LogOutputAppendable, TableLocationState, LivenessNode {

/**
* Listener interface for anything that wants to know about changes to a location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
//
package io.deephaven.engine.table.impl.locations;

import io.deephaven.engine.liveness.LivenessManager;
import io.deephaven.util.type.NamedImplementation;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Collection;
import java.util.function.Predicate;

/**
* Discovery utility for {@link TableLocation}s for a given table.
Expand Down Expand Up @@ -41,23 +43,24 @@ interface Listener extends BasicTableDataListener {
void endTransaction(@NotNull Object token);

/**
* Notify the listener of a {@link TableLocationKey} encountered while initiating or maintaining the location
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
* Notify the listener of a {@link TrackedTableLocationKey} encountered while initiating or maintaining the
* location subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
* guaranteed. This addition is not part of any transaction, and is equivalent to
* {@code handleTableLocationKeyAdded(tableLocationKey, null);} by default.
*
* @param tableLocationKey The new table location key.
*/
void handleTableLocationKeyAdded(@NotNull ImmutableTableLocationKey tableLocationKey);
void handleTableLocationKeyAdded(@NotNull TrackedTableLocationKey tableLocationKey);

/**
* Notify the listener of a {@link TableLocationKey} that has been removed. This removal is not part of any
* transaction, and is equivalent to {@code handleTableLocationKeyRemoved(tableLocationKey, null);} by default.
* Notify the listener of a {@link TrackedTableLocationKey} that has been removed. This removal is not part of
* any transaction, and is equivalent to {@code handleTableLocationKeyRemoved(tableLocationKey, null);} by
* default.
*
* @param tableLocationKey The table location key that was removed.
*/
@SuppressWarnings("unused")
void handleTableLocationKeyRemoved(@NotNull ImmutableTableLocationKey tableLocationKey);
void handleTableLocationKeyRemoved(@NotNull TrackedTableLocationKey tableLocationKey);

/**
* <p>
Expand All @@ -70,8 +73,8 @@ interface Listener extends BasicTableDataListener {
* @param removedKeys Collection of table location keys that were removed.
*/
default void handleTableLocationKeysUpdate(
@NotNull Collection<ImmutableTableLocationKey> addedKeys,
@NotNull Collection<ImmutableTableLocationKey> removedKeys) {
@NotNull Collection<TrackedTableLocationKey> addedKeys,
@NotNull Collection<TrackedTableLocationKey> removedKeys) {
removedKeys.forEach(this::handleTableLocationKeyRemoved);
addedKeys.forEach(this::handleTableLocationKeyAdded);
}
Expand Down Expand Up @@ -132,7 +135,26 @@ default void handleTableLocationKeysUpdate(
* @return A collection of keys for locations available from this provider.
*/
@NotNull
Collection<ImmutableTableLocationKey> getTableLocationKeys();
default Collection<TrackedTableLocationKey> getTableLocationKeys() {
return getTableLocationKeys(key -> true);
}

/**
* <p>
* Get the provider's currently known location keys which pass the supplied filter. The locations specified by the
* keys returned may have null size - that is, they may not "exist" for application purposes.
* {@link #getTableLocation(TableLocationKey)} is guaranteed to succeed for all results.
* </p>
*
* <p>
* This call also adds a management reference to the TLK from the provide {@link LivenessManager}
* </p>
*
* @param filter A filter to apply to the location keys.
* @return A collection of keys for locations available from this provider.
*/
@NotNull
Collection<TrackedTableLocationKey> getTableLocationKeys(final Predicate<TableLocationKey> filter);

/**
* Check if this provider knows the supplied location key.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.locations;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import org.jetbrains.annotations.NotNull;

import java.util.Set;
import java.util.function.Consumer;

/**
* Sub-interface of {@link TableLocationKey} to mark immutable implementations.
*/
public class TrackedTableLocationKey extends ReferenceCountedLivenessNode implements ImmutableTableLocationKey {

public static TrackedTableLocationKey[] ZERO_LENGTH_TRACKED_TABLE_LOCATION_KEY_ARRAY =
new TrackedTableLocationKey[0];

final ImmutableTableLocationKey locationKey;
final Consumer<TrackedTableLocationKey> zeroCountConsumer;

private TableLocation tableLocation;

public TrackedTableLocationKey(
final ImmutableTableLocationKey locationKey,
final Consumer<TrackedTableLocationKey> zeroCountConsumer) {
super(false);

this.locationKey = locationKey;
this.zeroCountConsumer = zeroCountConsumer;

tableLocation = null;
}

public ImmutableTableLocationKey getKey() {
return locationKey;
}

/**
* This {@link TrackedTableLocationKey} should manage the given {@link TableLocation} and store a reference to it.
*
* @param tableLocation The {@link TableLocation} to manage.
*/
public void manageTableLocation(TableLocation tableLocation) {
Assert.eqNull(this.tableLocation, "this.tableLocation");
this.tableLocation = tableLocation;
manage(tableLocation);
}

/**
* Unmanage the {@link TableLocation} and the release the reference.
*/
public void unmanageTableLocation() {
Assert.neqNull(this.tableLocation, "this.tableLocation");
unmanage(tableLocation);
tableLocation = null;
}

@Override
protected void destroy() {
super.destroy();
if (tableLocation != null) {
unmanageTableLocation();
}
zeroCountConsumer.accept(this);
}

@Override
public <PARTITION_VALUE_TYPE extends Comparable<PARTITION_VALUE_TYPE>> PARTITION_VALUE_TYPE getPartitionValue(
@NotNull String partitionKey) {
return locationKey.getPartitionValue(partitionKey);
}

@Override
public Set<String> getPartitionKeys() {
return locationKey.getPartitionKeys();
}

@Override
public int compareTo(@NotNull TableLocationKey o) {
return locationKey.compareTo(o);
}

@Override
public boolean equals(Object o) {
return locationKey.equals(o);
}
}
Loading

0 comments on commit 30910dd

Please sign in to comment.