transformer,
- boolean expectRefreshingResults);
+ boolean expectRefreshingResults,
+ @NotNull Dependency... dependencies);
/**
*
@@ -343,10 +371,10 @@ PartitionedTable partitionedTransform(
* times.
*
* @param keyColumnValues Ordered, boxed values for the key columns in the same order as {@link #keyColumnNames()}
- * @throws IllegalArgumentException If {@code keyColumnValues.length != keyColumnNames().size()}
- * @throws UnsupportedOperationException If multiple matching rows for the {@code keyColumnValues} were found
* @return The {@link Table constituent} at the single row in {@link #table()} matching the {@code keyColumnValues},
* or {@code null} if no matches were found
+ * @throws IllegalArgumentException If {@code keyColumnValues.length != keyColumnNames().size()}
+ * @throws UnsupportedOperationException If multiple matching rows for the {@code keyColumnValues} were found
*/
@ConcurrentMethod
Table constituentFor(@NotNull Object... keyColumnValues);
diff --git a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java
index 9aeea73fc3b..f9908a3dc87 100644
--- a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java
+++ b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java
@@ -7,8 +7,6 @@
import java.util.Collection;
import java.util.ServiceLoader;
-import java.util.function.BinaryOperator;
-import java.util.function.UnaryOperator;
/**
* Factory for producing Deephaven engine {@link PartitionedTable} instances.
@@ -73,9 +71,9 @@ private static Creator partitionedTableCreator() {
* @param table The "raw" {@link Table table} of {@link Table tables}. Should be {@link Table#isRefreshing()
* refreshing} if any constituents are.
* @param keyColumnNames The "key" column names from {@code table}. Key columns are used in
- * {@link PartitionedTable#transform(UnaryOperator)} to validate the safety and correctness of join
- * operations and in {@link PartitionedTable#partitionedTransform(PartitionedTable, BinaryOperator)} to
- * correlate tables that should be transformed together. Passing an ordered set is highly recommended.
+ * {@link PartitionedTable#transform transform} to validate the safety and correctness of join operations and
+ * in {@link PartitionedTable#partitionedTransform partitionedTransform} to correlate tables that should be
+ * transformed together. Passing an ordered set is highly recommended.
* @param uniqueKeys Whether the keys (key column values for a row considered as a tuple) in {@code table} are
* guaranteed to be unique
* @param constituentColumnName The "constituent" column name from {@code table}. The constituent column contains
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java
index 2d6c8f8dde6..5c600e2a63f 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java
@@ -1698,6 +1698,10 @@ private static OperatorAggregationStateManager makeInitializedStateManager(
outputPosition.setValue(0);
final OperatorAggregationStateManager stateManager = stateManagerSupplier.get();
+ if (initialKeys.isEmpty()) {
+ return stateManager;
+ }
+
final ColumnSource>[] keyColumnsToInsert;
final boolean closeRowsToInsert;
final RowSequence rowsToInsert;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java
index 81776ba100b..be54278bcd9 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java
@@ -14,8 +14,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.function.BiFunction;
-import java.util.function.Function;
/**
* Base {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#transform} and
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java
index a78458837af..796cef1b07b 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java
@@ -23,8 +23,8 @@
import java.util.function.BinaryOperator;
/**
- * {@link SelectColumn} implementation to wrap transformer functions for
- * {@link PartitionedTable#partitionedTransform(PartitionedTable, BinaryOperator) partitioned transformations}.
+ * {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#partitionedTransform
+ * partitioned transformations}.
*/
class BiTableTransformationColumn extends BaseTableTransformationColumn {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java
index ff4edfc1500..e3ecf0a6b03 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java
@@ -29,8 +29,10 @@
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.sources.UnionSourceManager;
import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator;
+import io.deephaven.engine.updategraph.NotificationQueue.Dependency;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
+import io.deephaven.util.annotations.InternalUseOnly;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;
@@ -64,10 +66,11 @@ public class PartitionedTableImpl extends LivenessArtifact implements Partitione
private volatile WeakReference memoizedMerge;
/**
+ * @apiNote Only engine-internal tools should call this constructor directly
* @see PartitionedTableFactory#of(Table, Collection, boolean, String, TableDefinition, boolean) Factory method that
* delegates to this method
- * @apiNote Only engine-internal tools should call this constructor directly
*/
+ @InternalUseOnly
public PartitionedTableImpl(
@NotNull final Table table,
@NotNull final Collection keyColumnNames,
@@ -239,14 +242,17 @@ public PartitionedTableImpl filter(@NotNull final Collection extends Filter> f
throw new IllegalArgumentException("Unsupported filter against constituent column " + constituentColumnName
+ " found in filters: " + filters);
}
- return new PartitionedTableImpl(
- table.where(Filter.and(whereFilters)),
- keyColumnNames,
- uniqueKeys,
- constituentColumnName,
- constituentDefinition,
- constituentChangesPermitted || table.isRefreshing(),
- false);
+ return LivenessScopeStack.computeEnclosed(
+ () -> new PartitionedTableImpl(
+ table.where(Filter.and(whereFilters)),
+ keyColumnNames,
+ uniqueKeys,
+ constituentColumnName,
+ constituentDefinition,
+ constituentChangesPermitted || table.isRefreshing(),
+ false),
+ table::isRefreshing,
+ pt -> pt.table().isRefreshing());
}
@ConcurrentMethod
@@ -259,59 +265,66 @@ public PartitionedTable sort(@NotNull final Collection sortColumns)
throw new IllegalArgumentException("Unsupported sort on constituent column " + constituentColumnName
+ " found in sort columns: " + sortColumns);
}
- return new PartitionedTableImpl(
- table.sort(sortColumns),
- keyColumnNames,
- uniqueKeys,
- constituentColumnName,
- constituentDefinition,
- constituentChangesPermitted || table.isRefreshing(),
- false);
+ return LivenessScopeStack.computeEnclosed(
+ () -> new PartitionedTableImpl(
+ table.sort(sortColumns),
+ keyColumnNames,
+ uniqueKeys,
+ constituentColumnName,
+ constituentDefinition,
+ constituentChangesPermitted || table.isRefreshing(),
+ false),
+ table::isRefreshing,
+ pt -> pt.table().isRefreshing());
}
@ConcurrentMethod
@Override
- public PartitionedTableImpl transform(
+ public PartitionedTable transform(
@Nullable final ExecutionContext executionContext,
@NotNull final UnaryOperator transformer,
- final boolean expectRefreshingResults) {
- final Table resultTable;
+ final boolean expectRefreshingResults,
+ @NotNull final Dependency... dependencies) {
+ final PartitionedTable resultPartitionedTable;
final TableDefinition resultConstituentDefinition;
final LivenessManager enclosingScope = LivenessScopeStack.peek();
try (final SafeCloseable ignored1 = executionContext == null ? null : executionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {
- final Table asRefreshingIfNeeded = maybeCopyAsRefreshing(table, expectRefreshingResults);
+ final Table prepared = prepareForTransform(table, expectRefreshingResults, dependencies);
// Perform the transformation
- resultTable = asRefreshingIfNeeded.update(List.of(new TableTransformationColumn(
- constituentColumnName, executionContext,
- asRefreshingIfNeeded.isRefreshing() ? transformer : assertResultsStatic(transformer))));
- enclosingScope.manage(resultTable);
+ final Table resultTable = prepared.update(List.of(new TableTransformationColumn(
+ constituentColumnName,
+ executionContext,
+ prepared.isRefreshing() ? transformer : assertResultsStatic(transformer))));
// Make sure we have a valid result constituent definition
final Table emptyConstituent = emptyConstituent(constituentDefinition);
final Table resultEmptyConstituent = transformer.apply(emptyConstituent);
resultConstituentDefinition = resultEmptyConstituent.getDefinition();
- }
- // Build the result partitioned table
- return new PartitionedTableImpl(
- resultTable,
- keyColumnNames,
- uniqueKeys,
- constituentColumnName,
- resultConstituentDefinition,
- constituentChangesPermitted,
- true);
+ // Build the result partitioned table
+ resultPartitionedTable = new PartitionedTableImpl(
+ resultTable,
+ keyColumnNames,
+ uniqueKeys,
+ constituentColumnName,
+ resultConstituentDefinition,
+ constituentChangesPermitted,
+ true);
+ enclosingScope.manage(resultPartitionedTable);
+ }
+ return resultPartitionedTable;
}
@Override
- public PartitionedTableImpl partitionedTransform(
+ public PartitionedTable partitionedTransform(
@NotNull final PartitionedTable other,
@Nullable final ExecutionContext executionContext,
@NotNull final BinaryOperator transformer,
- final boolean expectRefreshingResults) {
+ final boolean expectRefreshingResults,
+ @NotNull final Dependency... dependencies) {
// Check safety before doing any extra work
final UpdateGraph updateGraph = table.getUpdateGraph(other.table());
if (table.isRefreshing() || other.table().isRefreshing()) {
@@ -321,7 +334,7 @@ public PartitionedTableImpl partitionedTransform(
// Validate join compatibility
final MatchPair[] joinPairs = matchKeyColumns(this, other);
- final Table resultTable;
+ final PartitionedTable resultPartitionedTable;
final TableDefinition resultConstituentDefinition;
final LivenessManager enclosingScope = LivenessScopeStack.peek();
try (final SafeCloseable ignored1 = executionContext == null ? null : executionContext.open();
@@ -334,39 +347,55 @@ public PartitionedTableImpl partitionedTransform(
.where(new MatchFilter(Inverted, RHS_CONSTITUENT, (Object) null))
: table.join(other.table(), Arrays.asList(joinPairs), Arrays.asList(joinAdditions));
- final Table asRefreshingIfNeeded = maybeCopyAsRefreshing(joined, expectRefreshingResults);
+ final Table prepared = prepareForTransform(joined, expectRefreshingResults, dependencies);
- resultTable = asRefreshingIfNeeded
- .update(List.of(new BiTableTransformationColumn(constituentColumnName, RHS_CONSTITUENT,
+ final Table resultTable = prepared
+ .update(List.of(new BiTableTransformationColumn(
+ constituentColumnName,
+ RHS_CONSTITUENT,
executionContext,
- asRefreshingIfNeeded.isRefreshing() ? transformer : assertResultsStatic(transformer))))
+ prepared.isRefreshing() ? transformer : assertResultsStatic(transformer))))
.dropColumns(RHS_CONSTITUENT);
- enclosingScope.manage(resultTable);
// Make sure we have a valid result constituent definition
final Table emptyConstituent1 = emptyConstituent(constituentDefinition);
final Table emptyConstituent2 = emptyConstituent(other.constituentDefinition());
final Table resultEmptyConstituent = transformer.apply(emptyConstituent1, emptyConstituent2);
resultConstituentDefinition = resultEmptyConstituent.getDefinition();
- }
- // Build the result partitioned table
- return new PartitionedTableImpl(
- resultTable,
- keyColumnNames,
- uniqueKeys,
- constituentColumnName,
- resultConstituentDefinition,
- constituentChangesPermitted || other.constituentChangesPermitted(),
- true);
+ // Build the result partitioned table
+ resultPartitionedTable = new PartitionedTableImpl(
+ resultTable,
+ keyColumnNames,
+ uniqueKeys,
+ constituentColumnName,
+ resultConstituentDefinition,
+ constituentChangesPermitted || other.constituentChangesPermitted(),
+ true);
+ enclosingScope.manage(resultPartitionedTable);
+ }
+ return resultPartitionedTable;
}
- private Table maybeCopyAsRefreshing(Table table, boolean expectRefreshingResults) {
- if (!expectRefreshingResults || table.isRefreshing()) {
+ private static Table prepareForTransform(
+ @NotNull final Table table,
+ final boolean expectRefreshingResults,
+ @Nullable final Dependency[] dependencies) {
+
+ final boolean addDependencies = dependencies != null && dependencies.length > 0;
+ final boolean setRefreshing = (expectRefreshingResults || addDependencies) && !table.isRefreshing();
+
+ if (!addDependencies && !setRefreshing) {
return table;
}
+
final Table copied = ((QueryTable) table.coalesce()).copy();
- copied.setRefreshing(true);
+ if (setRefreshing) {
+ copied.setRefreshing(true);
+ }
+ if (addDependencies) {
+ Arrays.stream(dependencies).forEach(copied::addParentReference);
+ }
return copied;
}
@@ -468,7 +497,8 @@ private Table[] fetchConstituents(final boolean usePrev) {
/**
* Validate that {@code lhs} and {@code rhs} have compatible key columns, allowing
- * {@link #partitionedTransform(PartitionedTable, BinaryOperator)}. Compute the matching pairs of key column names.
+ * {@link PartitionedTable#partitionedTransform partitionedTransform}. Compute the matching pairs of key column
+ * names.
*
* @param lhs The first partitioned table
* @param rhs The second partitioned table
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java
index 224ffe086d5..89e789cea6e 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java
@@ -13,20 +13,18 @@
import io.deephaven.api.updateby.UpdateByControl;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessArtifact;
+import io.deephaven.engine.liveness.LivenessScopeStack;
+import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.MatchPair;
-import io.deephaven.engine.table.PartitionedTable;
-import io.deephaven.engine.table.Table;
-import io.deephaven.engine.table.TableDefinition;
-import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.*;
import io.deephaven.engine.table.impl.select.MatchFilter;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.engine.table.impl.select.SourceColumn;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
+import io.deephaven.engine.updategraph.NotificationQueue.Dependency;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.TableTools;
-import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -182,12 +180,20 @@ private PartitionedTable.Proxy complexTransform(
if (refreshingResults && joinMatches != null) {
updateGraph.checkInitiateSerialTableOperation();
}
- try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
- return new PartitionedTableProxyImpl(
- target.transform(context, ct -> transformer.apply(ct, otherTable), refreshingResults),
- requireMatchingKeys,
- sanityCheckJoins);
- }
+
+ final Dependency[] dependencies = otherTable.isRefreshing()
+ ? new Dependency[] {otherTable}
+ : new Dependency[0];
+
+ return ExecutionContext.getContext().withUpdateGraph(updateGraph).apply(
+ () -> new PartitionedTableProxyImpl(
+ target.transform(
+ context,
+ ct -> transformer.apply(ct, otherTable),
+ refreshingResults,
+ dependencies),
+ requireMatchingKeys,
+ sanityCheckJoins));
}
if (other instanceof PartitionedTable.Proxy) {
final PartitionedTable.Proxy otherProxy = (PartitionedTable.Proxy) other;
@@ -197,32 +203,45 @@ private PartitionedTable.Proxy complexTransform(
if (refreshingResults) {
updateGraph.checkInitiateSerialTableOperation();
}
- try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
-
- final MatchPair[] keyColumnNamePairs = PartitionedTableImpl.matchKeyColumns(target, otherTarget);
- final DependentValidation uniqueKeys = requireMatchingKeys
- ? matchingKeysValidation(target, otherTarget, keyColumnNamePairs)
- : null;
- final DependentValidation overlappingLhsJoinKeys = sanityCheckJoins && joinMatches != null
- ? overlappingLhsJoinKeysValidation(target, joinMatches)
- : null;
- final DependentValidation overlappingRhsJoinKeys = sanityCheckJoins && joinMatches != null
- ? overlappingRhsJoinKeysValidation(otherTarget, joinMatches)
- : null;
-
- final Table validatedLhsTable = validated(target.table(), uniqueKeys, overlappingLhsJoinKeys);
- final Table validatedRhsTable = validated(otherTarget.table(), uniqueKeys, overlappingRhsJoinKeys);
- final PartitionedTable lhsToUse = maybeRewrap(validatedLhsTable, target);
- final PartitionedTable rhsToUse = maybeRewrap(validatedRhsTable, otherTarget);
-
- return new PartitionedTableProxyImpl(
- lhsToUse.partitionedTransform(rhsToUse, context, transformer, refreshingResults),
- requireMatchingKeys,
- sanityCheckJoins);
- }
+ return ExecutionContext.getContext().withUpdateGraph(updateGraph)
+ .apply(() -> LivenessScopeStack.computeEnclosed(
+ () -> {
+ final MatchPair[] keyColumnNamePairs =
+ PartitionedTableImpl.matchKeyColumns(target, otherTarget);
+ final DependentValidation uniqueKeys = requireMatchingKeys
+ ? matchingKeysValidation(target, otherTarget, keyColumnNamePairs)
+ : null;
+ final DependentValidation overlappingLhsJoinKeys =
+ sanityCheckJoins && joinMatches != null
+ ? overlappingLhsJoinKeysValidation(target, joinMatches)
+ : null;
+ final DependentValidation overlappingRhsJoinKeys =
+ sanityCheckJoins && joinMatches != null
+ ? overlappingRhsJoinKeysValidation(otherTarget, joinMatches)
+ : null;
+
+ final Table validatedLhsTable =
+ validated(target.table(), uniqueKeys, overlappingLhsJoinKeys);
+ final Table validatedRhsTable =
+ validated(otherTarget.table(), uniqueKeys, overlappingRhsJoinKeys);
+ final PartitionedTable lhsToUse = maybeRewrap(validatedLhsTable, target);
+ final PartitionedTable rhsToUse = maybeRewrap(validatedRhsTable, otherTarget);
+
+ return new PartitionedTableProxyImpl(
+ lhsToUse.partitionedTransform(rhsToUse, context, transformer,
+ refreshingResults),
+ requireMatchingKeys,
+ sanityCheckJoins);
+ },
+ () -> refreshingResults,
+ ptp -> refreshingResults));
}
- throw new IllegalArgumentException("Unexpected TableOperations input " + other
- + ", expected Table or PartitionedTable.Proxy");
+ throw onUnexpectedTableOperations(other);
+ }
+
+ private static IllegalArgumentException onUnexpectedTableOperations(@NotNull TableOperations, ?> other) {
+ return new IllegalArgumentException(String.format(
+ "Unexpected TableOperations input %s, expected Table or PartitionedTable.Proxy", other));
}
/**
@@ -246,7 +265,7 @@ private DependentValidation(
private static Table validated(
@NotNull final Table parent,
- @NotNull final DependentValidation... dependentValidationsIn) {
+ final DependentValidation... dependentValidationsIn) {
if (dependentValidationsIn.length == 0 || !parent.isRefreshing()) {
return parent;
}
@@ -389,7 +408,8 @@ private static void checkOverlappingJoinKeys(
}
}
- private static PartitionedTable maybeRewrap(@NotNull final Table table, @NotNull final PartitionedTable existing) {
+ private static PartitionedTable maybeRewrap(@NotNull final Table table,
+ @NotNull final PartitionedTable existing) {
return table == existing.table()
? existing
: new PartitionedTableImpl(table, existing.keyColumnNames(), existing.uniqueKeys(),
@@ -553,11 +573,28 @@ public PartitionedTable.Proxy aggAllBy(AggSpec spec, ColumnName... groupByColumn
public PartitionedTable.Proxy aggBy(Collection extends Aggregation> aggregations, boolean preserveEmpty,
TableOperations, ?> initialGroups, Collection extends ColumnName> groupByColumns) {
if (initialGroups == null) {
- return basicTransform(true, ct -> ct.aggBy(aggregations, preserveEmpty, null, groupByColumns));
+ return basicTransform(
+ true,
+ ct -> ct.aggBy(aggregations, preserveEmpty, null, groupByColumns));
+ }
+ if (initialGroups instanceof Table) {
+ // Force a consistent view of initial groups table to be used for all current and future constituents
+ final Table initialGroupsTable = LivenessScopeStack.computeEnclosed(
+ () -> ((Table) initialGroups).selectDistinct(groupByColumns).snapshot(),
+ () -> ((Table) initialGroups).isRefreshing(),
+ Table::isRefreshing);
+ return basicTransform(
+ true,
+ ct -> ct.aggBy(aggregations, preserveEmpty, initialGroupsTable, groupByColumns));
+ }
+ if (initialGroups instanceof PartitionedTable.Proxy) {
+ return complexTransform(
+ true,
+ initialGroups,
+ (ct, ot) -> ct.aggBy(aggregations, preserveEmpty, ot, groupByColumns),
+ null);
}
- return complexTransform(true, initialGroups,
- (ct, ot) -> ct.aggBy(aggregations, preserveEmpty, ot, groupByColumns),
- null);
+ throw onUnexpectedTableOperations(initialGroups);
}
@Override
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java
index 262d12c08c7..6a7cb5db1c1 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java
@@ -21,11 +21,10 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
-import java.util.function.UnaryOperator;
/**
- * {@link SelectColumn} implementation to wrap transformer functions for
- * {@link PartitionedTable#transform(UnaryOperator) transformations}.
+ * {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#transform
+ * transformations}.
*/
public class TableTransformationColumn extends BaseTableTransformationColumn {
diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
index 13c3b14af50..a2fbaee6581 100644
--- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
+++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
@@ -1035,13 +1035,26 @@ public void refreshUpdateSourceForUnitTests(@NotNull final Runnable updateSource
*/
@TestUseOnly
public boolean flushOneNotificationForUnitTests() {
+ return flushOneNotificationForUnitTests(false);
+ }
+
+ /**
+ * Flush a single notification from the UpdateGraph queue. Note that this happens on a simulated UpdateGraph run
+ * thread, rather than this thread.
+ *
+ * @param expectOnlyUnsatisfiedNotifications Whether we expect there to be only unsatisfied notifications pending
+ * @return whether a notification was found in the queue
+ */
+ @TestUseOnly
+ public boolean flushOneNotificationForUnitTests(final boolean expectOnlyUnsatisfiedNotifications) {
Assert.assertion(unitTestMode, "unitTestMode");
final NotificationProcessor existingNotificationProcessor = notificationProcessor;
try {
this.notificationProcessor = new ControlledNotificationProcessor();
// noinspection AutoUnboxing,AutoBoxing
- return unitTestRefreshThreadPool.submit(this::flushOneNotificationForUnitTestsInternal).get();
+ return unitTestRefreshThreadPool.submit(
+ () -> flushOneNotificationForUnitTestsInternal(expectOnlyUnsatisfiedNotifications)).get();
} catch (InterruptedException | ExecutionException e) {
throw new UncheckedDeephavenException(e);
} finally {
@@ -1050,7 +1063,7 @@ public boolean flushOneNotificationForUnitTests() {
}
@TestUseOnly
- public boolean flushOneNotificationForUnitTestsInternal() {
+ private boolean flushOneNotificationForUnitTestsInternal(final boolean expectOnlyUnsatisfiedNotifications) {
final IntrusiveDoublyLinkedQueue pendingToEvaluate =
new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance());
notificationProcessor.beforeNotificationsDrained();
@@ -1077,7 +1090,12 @@ public boolean flushOneNotificationForUnitTestsInternal() {
}
if (satisfied != null) {
notificationProcessor.submit(satisfied);
- } else if (somethingWasPending) {
+ if (expectOnlyUnsatisfiedNotifications) {
+ // noinspection ThrowableNotThrown
+ Assert.statementNeverExecuted(
+ "Flushed a notification in unit test mode, but expected only unsatisfied pending notifications");
+ }
+ } else if (somethingWasPending && !expectOnlyUnsatisfiedNotifications) {
// noinspection ThrowableNotThrown
Assert.statementNeverExecuted(
"Did not flush any notifications in unit test mode, yet there were outstanding notifications");
@@ -1113,7 +1131,7 @@ public Runnable flushAllNormalNotificationsForUnitTests(@NotNull final BooleanSu
final Future> flushJobFuture = unitTestRefreshThreadPool.submit(() -> {
final long deadlineNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
boolean flushed;
- while ((flushed = flushOneNotificationForUnitTestsInternal()) || !done.getAsBoolean()) {
+ while ((flushed = flushOneNotificationForUnitTestsInternal(false)) || !done.getAsBoolean()) {
if (!flushed) {
final long remainingNanos = deadlineNanoTime - System.nanoTime();
if (!controlledNotificationProcessor.blockUntilNotificationAdded(remainingNanos)) {
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java
index b99e1efac38..22c45cca47f 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java
@@ -7,8 +7,8 @@
import io.deephaven.api.SortColumn;
import io.deephaven.api.agg.Partition;
import io.deephaven.base.SleepUtil;
+import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
-import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.context.QueryScope;
@@ -25,13 +25,14 @@
import io.deephaven.engine.testutil.generator.SetGenerator;
import io.deephaven.engine.testutil.generator.SortedLongGenerator;
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
+import io.deephaven.engine.updategraph.NotificationQueue;
+import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
-import io.deephaven.io.logger.StreamLoggerImpl;
import io.deephaven.test.types.OutOfBandTest;
import io.deephaven.util.SafeCloseable;
-import io.deephaven.util.process.ProcessEnvironment;
import junit.framework.TestCase;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableLong;
import org.junit.experimental.categories.Category;
@@ -43,6 +44,8 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;
+import static io.deephaven.api.agg.Aggregation.AggLast;
+import static io.deephaven.api.agg.Aggregation.AggSum;
import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.*;
import static org.assertj.core.api.Assertions.assertThat;
@@ -215,6 +218,9 @@ public void testProxy() {
rightTable.partitionedAggBy(List.of(), true, testTable(col("Sym", "aa", "bb", "cc", "dd")), "Sym");
final PartitionedTable.Proxy rightProxy = rightPT.proxy(false, false);
+ final Table initialKeys = newTable(col("Sym", "cc", "dd", "aa", "bb"), intCol("intCol", 0, 2, 3, 4));
+ final PartitionedTable.Proxy initialKeysProxy = initialKeys.partitionBy("Sym").proxy();
+
final EvalNuggetInterface[] en = new EvalNuggetInterface[] {
new EvalNugget() {
public Table e() {
@@ -232,8 +238,21 @@ public Table e() {
leftProxy.naturalJoin(rightTable.lastBy("Sym"), "Sym").target().merge().sort("K", "Sym")),
new QueryTableTest.TableComparator(withK.naturalJoin(rightTable.lastBy("Sym"), "Sym").sort("K", "Sym"),
leftProxy.naturalJoin(rightProxy.lastBy(), "Sym").target().merge().sort("K", "Sym")),
+ new QueryTableTest.TableComparator(
+ withK.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), "Sym", "intCol").sort("Sym", "intCol"),
+ leftProxy.aggBy(List.of(AggLast("Sym", "K"), AggSum("doubleCol")), "intCol").target().merge()
+ .moveColumnsUp("Sym").sort("Sym", "intCol")),
+ new QueryTableTest.TableComparator(
+ withK.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeys,
+ ColumnName.from("Sym", "intCol")).sort("Sym", "intCol"),
+ leftProxy.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeys,
+ ColumnName.from("Sym", "intCol")).target().merge().sort("Sym", "intCol")),
+ new QueryTableTest.TableComparator(
+ withK.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeys,
+ ColumnName.from("Sym", "intCol")).sort("Sym", "intCol"),
+ leftProxy.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeysProxy,
+ ColumnName.from("Sym", "intCol")).target().merge().sort("Sym", "intCol")),
};
-
for (int i = 0; i < 100; i++) {
simulateShiftAwareStep(size, random, table, columnInfo, en);
}
@@ -407,6 +426,71 @@ public void testDependencies() {
});
}
+ public void testTransformDependencies() {
+ final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
+
+ final QueryTable sourceTable = testRefreshingTable(i(1, 2, 4, 6).toTracking(),
+ col("USym", "aa", "bb", "aa", "bb"),
+ col("Sentinel", 10, 20, 40, 60));
+
+ final QueryTable extraTable = testRefreshingTable(i(0).toTracking(),
+ col("Value", "0.2"));
+ final MutableBoolean extraParentSatisfied = new MutableBoolean(false);
+ final NotificationQueue.Dependency extraParentDependency = new NotificationQueue.Dependency() {
+ @Override
+ public boolean satisfied(long step) {
+ return extraParentSatisfied.booleanValue();
+ }
+
+ @Override
+ public UpdateGraph getUpdateGraph() {
+ return updateGraph;
+ }
+
+ @Override
+ public LogOutput append(LogOutput logOutput) {
+ return logOutput.append("extra dependency");
+ }
+ };
+ extraTable.addParentReference(extraParentDependency);
+
+ final PartitionedTable partitioned = sourceTable.partitionBy("USym");
+ final PartitionedTable transformed = partitioned.transform(t -> t.join(extraTable), extraTable);
+
+ // We need to flush one notification: one for the source table because we do not require an intermediate
+ // view table in this case
+ updateGraph.runWithinUnitTestCycle(() -> {
+ // Add "dd" to source
+ addToTable(sourceTable, i(8), col("USym", "dd"), col("Sentinel", 80));
+ sourceTable.notifyListeners(i(8), i(), i());
+ TestCase.assertTrue(updateGraph.flushOneNotificationForUnitTests());
+
+ // PartitionBy has processed "dd"
+ TestCase.assertTrue(partitioned.table().satisfied(updateGraph.clock().currentStep()));
+ TestCase.assertNotNull(partitioned.constituentFor("dd"));
+
+ // Transform has not processed "dd" yet
+ TestCase.assertFalse(transformed.table().satisfied(updateGraph.clock().currentStep()));
+ TestCase.assertNull(transformed.constituentFor("dd"));
+
+ // Flush the notification for transform's internal copy() of partitioned.table()
+ TestCase.assertTrue(updateGraph.flushOneNotificationForUnitTests());
+
+ // Add a row to extra
+ addToTable(extraTable, i(1), col("Value", "0.3"));
+ extraTable.notifyListeners(i(1), i(), i());
+ TestCase.assertFalse(updateGraph.flushOneNotificationForUnitTests(true)); // Fail to update anything
+
+ extraParentSatisfied.setTrue(); // Allow updates to propagate
+ updateGraph.flushAllNormalNotificationsForUnitTests();
+
+ TestCase.assertTrue(transformed.table().satisfied(updateGraph.clock().currentStep()));
+ final Table transformedDD = transformed.constituentFor("dd");
+ TestCase.assertTrue(transformedDD.satisfied(updateGraph.clock().currentStep()));
+ TestCase.assertEquals(2, transformedDD.size());
+ });
+ }
+
public static class PauseHelper {
private final long start = System.currentTimeMillis();
private volatile boolean released = false;