Skip to content

Commit

Permalink
Compiles now, still many problems
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Sep 13, 2024
1 parent 30910dd commit dd12240
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ protected final Table redefine(TableDefinition newDefinitionExternal, TableDefin
}

private static final String LOCATION_KEY_COLUMN_NAME = "__PartitionAwareSourceTable_TableLocationKey__";
private static final String TRACKED_KEY_COLUMN_NAME = "__PartitionAwareSourceTable_TrackedTableLocationKey__";

private static <T> ColumnSource<? super T> makePartitionSource(@NotNull final ColumnDefinition<T> columnDefinition,
@NotNull final Collection<ImmutableTableLocationKey> locationKeys) {
Expand All @@ -228,29 +229,35 @@ protected final Collection<TrackedTableLocationKey> filterLocationKeys(
return foundLocationKeys;
}

final Collection<ImmutableTableLocationKey> immutableTableLocationKeys = foundLocationKeys.stream()
.map(TrackedTableLocationKey::getKey)
.collect(Collectors.toList());

// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
final List<String> partitionTableColumnNames = Stream.concat(
partitioningColumnDefinitions.keySet().stream(),
Stream.of(LOCATION_KEY_COLUMN_NAME)).collect(Collectors.toList());
Stream.of(LOCATION_KEY_COLUMN_NAME, TRACKED_KEY_COLUMN_NAME)).collect(Collectors.toList());
final List<ColumnSource<?>> partitionTableColumnSources =
new ArrayList<>(partitioningColumnDefinitions.size() + 1);
for (final ColumnDefinition<?> columnDefinition : partitioningColumnDefinitions.values()) {
partitionTableColumnSources.add(makePartitionSource(columnDefinition, foundLocationKeys));
partitionTableColumnSources.add(makePartitionSource(columnDefinition, immutableTableLocationKeys));
}
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(foundLocationKeys,
// Add the tracked and immutable keys to the table
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(immutableTableLocationKeys,
ImmutableTableLocationKey.class, null));
partitionTableColumnSources.add(ArrayBackedColumnSource.getMemoryColumnSource(foundLocationKeys,
TrackedTableLocationKey.class, null));

final Table filteredColumnPartitionTable = TableTools
.newTable(foundLocationKeys.size(), partitionTableColumnNames, partitionTableColumnSources)
.where(Filter.and(partitioningColumnFilters));
if (filteredColumnPartitionTable.size() == foundLocationKeys.size()) {
return foundLocationKeys;
}

// TODO: Not sure what to do here. Seems like there is a big disconnect between the location keys and the
// tracked location keys.

final Iterable<ImmutableTableLocationKey> iterable =
() -> filteredColumnPartitionTable.columnIterator(LOCATION_KEY_COLUMN_NAME);
// Return the filtered tracked location keys
final Iterable<TrackedTableLocationKey> iterable =
() -> filteredColumnPartitionTable.columnIterator(TRACKED_KEY_COLUMN_NAME);
return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,12 @@ protected void instrumentedRefresh() {
processNewLocationsUpdateRoot = null;
removedLocationsComitter = null;
tableLocationProvider.refresh();
try (final RowSet added = sortAndAddLocations(tableLocationProvider.getTableLocationKeys().stream()
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation))) {

final Collection<TableLocation> locations = new ArrayList<>();
tableLocationProvider.getTableLocationKeys(tlk -> {
locations.add(tableLocationProvider.getTableLocation(tlk.getKey()));
}, locationKeyMatcher);
try (final RowSet added = sortAndAddLocations(locations.stream())) {
resultRows.insert(added);
}
}
Expand Down Expand Up @@ -247,6 +250,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
locationUpdate.getPendingAddedLocationKeys().stream()
.map(TrackedTableLocationKey::getKey)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.map(PendingLocationState::new)
Expand All @@ -268,6 +272,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
final Set<ImmutableTableLocationKey> relevantRemovedLocations =
locationUpdate.getPendingRemovedLocationKeys()
.stream()
.map(TrackedTableLocationKey::getKey)
.filter(locationKeyMatcher)
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collection;

/**
Expand Down Expand Up @@ -146,7 +147,9 @@ private void initializeAvailableLocations() {
updateSourceRegistrar.addSource(locationChangePoller = new LocationChangePoller(locationBuffer));
} else {
locationProvider.refresh();
maybeAddLocations(locationProvider.getTableLocationKeys());
final Collection<TrackedTableLocationKey> tableLocationKeys = new ArrayList<>();
locationProvider.getTableLocationKeys(tableLocationKeys::add);
maybeAddLocations(tableLocationKeys);
}
});
locationsInitialized = true;
Expand All @@ -161,8 +164,9 @@ private void maybeAddLocations(@NotNull final Collection<TrackedTableLocationKey
.parallelStream()
.forEach(lk -> {
// Unconditionally manage all locations added to the column source manager
columnSourceManager.manage(lk);
columnSourceManager.addLocation(locationProvider.getTableLocation(lk));
final TableLocation tableLocation = locationProvider.getTableLocation(lk.getKey());
columnSourceManager.manage(tableLocation);
columnSourceManager.addLocation(tableLocation);
});
}

Expand Down Expand Up @@ -214,7 +218,8 @@ private LocationChangePoller(@NotNull final TableLocationSubscriptionBuffer loca

@Override
protected void instrumentedRefresh() {
try(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending()) {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
locationBuffer.processPending()) {
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//
package io.deephaven.engine.table.impl.locations;

import io.deephaven.engine.liveness.LivenessManager;
import io.deephaven.util.type.NamedImplementation;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -131,30 +131,17 @@ default void handleTableLocationKeysUpdate(
* Get this provider's currently known location keys. The locations specified by the keys returned may have null
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
*
* @return A collection of keys for locations available from this provider.
*/
@NotNull
default Collection<TrackedTableLocationKey> getTableLocationKeys() {
return getTableLocationKeys(key -> true);
default void getTableLocationKeys(Consumer<TrackedTableLocationKey> consumer) {
getTableLocationKeys(consumer, key -> true);
}

/**
* <p>
* Get the provider's currently known location keys which pass the supplied filter. The locations specified by the
* keys returned may have null size - that is, they may not "exist" for application purposes.
* {@link #getTableLocation(TableLocationKey)} is guaranteed to succeed for all results.
* </p>
*
* <p>
* This call also adds a management reference to the TLK from the provide {@link LivenessManager}
* </p>
*
* @param filter A filter to apply to the location keys.
* @return A collection of keys for locations available from this provider.
* Get this provider's currently known location keys. The locations specified by the keys returned may have null
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
*/
@NotNull
Collection<TrackedTableLocationKey> getTableLocationKeys(final Predicate<TableLocationKey> filter);
void getTableLocationKeys(Consumer<TrackedTableLocationKey> consumer, Predicate<ImmutableTableLocationKey> filter);

/**
* Check if this provider knows the supplied location key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import org.jetbrains.annotations.NotNull;

import java.util.Set;
import java.util.function.Consumer;

/**
* Sub-interface of {@link TableLocationKey} to mark immutable implementations.
*/
public class TrackedTableLocationKey extends ReferenceCountedLivenessNode implements ImmutableTableLocationKey {
public class TrackedTableLocationKey extends ReferenceCountedLivenessNode {

public static TrackedTableLocationKey[] ZERO_LENGTH_TRACKED_TABLE_LOCATION_KEY_ARRAY =
new TrackedTableLocationKey[0];
Expand Down Expand Up @@ -66,25 +64,4 @@ protected void destroy() {
}
zeroCountConsumer.accept(this);
}

@Override
public <PARTITION_VALUE_TYPE extends Comparable<PARTITION_VALUE_TYPE>> PARTITION_VALUE_TYPE getPartitionValue(
@NotNull String partitionKey) {
return locationKey.getPartitionValue(partitionKey);
}

@Override
public Set<String> getPartitionKeys() {
return locationKey.getPartitionKeys();
}

@Override
public int compareTo(@NotNull TableLocationKey o) {
return locationKey.compareTo(o);
}

@Override
public boolean equals(Object o) {
return locationKey.equals(o);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package io.deephaven.engine.table.impl.locations.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.DelegatingLivenessNode;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.locations.*;
Expand All @@ -16,8 +14,8 @@
import org.jetbrains.annotations.Nullable;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Partial {@link TableLocationProvider} implementation for standalone use or as part of a {@link TableDataService}.
Expand All @@ -30,7 +28,7 @@
*/
public abstract class AbstractTableLocationProvider
extends SubscriptionAggregator<TableLocationProvider.Listener>
implements TableLocationProvider, DelegatingLivenessNode {
implements TableLocationProvider {

private static final Set<ImmutableTableLocationKey> EMPTY_TABLE_LOCATION_KEYS = Collections.emptySet();

Expand Down Expand Up @@ -146,11 +144,6 @@ public final ImmutableTableKey getKey() {
return tableKey;
}

@Override
public LivenessNode asLivenessNode() {
return livenessNode;
}

// ------------------------------------------------------------------------------------------------------------------
// TableLocationProvider/SubscriptionAggregator implementation
// ------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -234,7 +227,7 @@ protected void endTransaction(@NotNull final Object token) {
}
// Release the keys that were removed only after we have delivered the notifications and the
// subscribers have had a chance to process them
removedKeys.forEach(this::unmanage);
removedKeys.forEach(livenessNode::unmanage);
}
}
}
Expand Down Expand Up @@ -322,7 +315,7 @@ protected void handleTableLocationKeyRemoved(
// Remove this from the live set and un-manage it.
final TrackedTableLocationKey trackedKey = liveLocationKeys.get(locationKey);
liveLocationKeys.remove(trackedKey);
unmanage(trackedKey);
livenessNode.unmanage(trackedKey);
return;
}

Expand All @@ -340,7 +333,7 @@ protected void handleTableLocationKeyRemoved(
true)) {
onEmpty();
}
unmanage(trackedKey);
livenessNode.unmanage(trackedKey);
}
}
}
Expand All @@ -361,7 +354,7 @@ private Object observeInsert(@NotNull final TableLocationKey locationKey) {
locationCreatedRecorder = true;

final TrackedTableLocationKey trackedKey = toTrackedKey(locationKey);
manage(trackedKey);
livenessNode.manage(trackedKey);

// Add this to the live set.
liveLocationKeys.add(trackedKey);
Expand Down Expand Up @@ -413,14 +406,12 @@ protected void doInitialization() {
}

@Override
@NotNull
public final Collection<TrackedTableLocationKey> getTableLocationKeys(
final Predicate<TableLocationKey> filter) {
public void getTableLocationKeys(
final Consumer<TrackedTableLocationKey> consumer,
final Predicate<ImmutableTableLocationKey> filter) {
// Lock the live set and deliver a copy to the listener after filtering.
synchronized (liveLocationKeys) {
return liveLocationKeys.stream()
.filter(tk -> filter.test(tk.getKey()))
.collect(Collectors.toList());
liveLocationKeys.stream().filter(ttlk -> filter.test(ttlk.getKey())).forEach(consumer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.jetbrains.annotations.Nullable;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -116,7 +117,7 @@ public void subscribe(@NotNull final Listener listener) {
p.subscribe(listener);
} else {
p.refresh();
p.getTableLocationKeys().forEach(listener::handleTableLocationKeyAdded);
p.getTableLocationKeys(listener::handleTableLocationKeyAdded);
}
});
}
Expand All @@ -142,29 +143,30 @@ public TableLocationProvider ensureInitialized() {
}

@Override
public @NotNull Collection<TrackedTableLocationKey> getTableLocationKeys(Predicate<TableLocationKey> filter) {
public void getTableLocationKeys(
final Consumer<TrackedTableLocationKey> consumer,
final Predicate<ImmutableTableLocationKey> filter) {
final Set<TrackedTableLocationKey> locationKeys = new KeyedObjectHashSet<>(KeyKeyDefinition.INSTANCE);
try (final SafeCloseable ignored = CompositeTableDataServiceConsistencyMonitor.INSTANCE.start()) {
inputProviders.stream()
.map(TableLocationProvider::getTableLocationKeys)
.flatMap(Collection::stream)
.filter(tlk -> filter.test(tlk.getKey()))
.filter(x -> !locationKeys.add(x))
.findFirst()
.ifPresent(duplicateLocationKey -> {
final String overlappingProviders = inputProviders.stream()
.filter(inputProvider -> inputProvider.hasTableLocationKey(duplicateLocationKey))
.map(TableLocationProvider::getName)
.collect(Collectors.joining(","));
throw new TableDataException(
"Data Routing Configuration error: TableDataService elements overlap at location " +
duplicateLocationKey +
" in providers " + overlappingProviders +
". Full TableDataService configuration:\n" +
Formatter
.formatTableDataService(CompositeTableDataService.this.toString()));
});
return Collections.unmodifiableCollection(locationKeys);
// Add all the location keys from the providers to the set, throw an exception if there are duplicates
inputProviders.forEach(p -> p.getTableLocationKeys(tlk -> {
if (filter.test(tlk.getKey()) && !locationKeys.add(tlk)) {
final String overlappingProviders = inputProviders.stream()
.filter(inputProvider -> inputProvider.hasTableLocationKey(tlk.getKey()))
.map(TableLocationProvider::getName)
.collect(Collectors.joining(","));
throw new TableDataException(
"Data Routing Configuration error: TableDataService elements overlap at location " +
tlk +
" in providers " + overlappingProviders +
". Full TableDataService configuration:\n" +
Formatter
.formatTableDataService(CompositeTableDataService.this.toString()));

}
}));
// Consume all the location keys
locationKeys.forEach(consumer);
}
}

Expand Down
Loading

0 comments on commit dd12240

Please sign in to comment.