Skip to content

Commit

Permalink
Add optional extra dependencies to PartitionedTable.transform and Par…
Browse files Browse the repository at this point in the history
…titionedTable.p (#4889)
  • Loading branch information
rcaudy authored Nov 29, 2023
1 parent 6a8cc49 commit 0974068
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.updategraph.NotificationQueue.Dependency;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -27,11 +28,11 @@
* <p>
* A partitioned table is a {@link Table} with one or more columns containing non-{@code null}, like-defined constituent
* tables, optionally with "key" columns defined to allow
* {@link #partitionedTransform(PartitionedTable, BinaryOperator)} or proxied joins with other like-keyed partitioned
* tables.
* {@link #partitionedTransform(PartitionedTable, BinaryOperator, Dependency...)} or proxied joins with other like-keyed
* partitioned tables.
* <p>
* Note that partitioned tables should {@link io.deephaven.engine.updategraph.NotificationQueue.Dependency depend} on
* and {@link io.deephaven.engine.liveness.LivenessManager#manage(LivenessReferent) manage} their
* Note that partitioned tables should {@link Dependency depend} on and
* {@link io.deephaven.engine.liveness.LivenessManager#manage(LivenessReferent) manage} their
* {@link Table#isRefreshing() refreshing} constituents.
*/
public interface PartitionedTable extends LivenessNode, LogOutputAppendable {
Expand Down Expand Up @@ -151,12 +152,13 @@ default Proxy proxy() {
* PartitionedTable.
* <p>
* Each operation thus applied will produce a new PartitionedTable with the results as in
* {@link #transform(UnaryOperator)} or {@link #partitionedTransform(PartitionedTable, BinaryOperator)}, and return
* a new proxy to that PartitionedTable.
* {@link #transform(UnaryOperator, Dependency...)} or
* {@link #partitionedTransform(PartitionedTable, BinaryOperator, Dependency...)}, and return a new proxy to that
* PartitionedTable.
*
* @param requireMatchingKeys Whether to ensure that both partitioned tables have all the same keys present when a
* proxied operation uses {@code this} and another {@link PartitionedTable} as inputs for a
* {@link #partitionedTransform(PartitionedTable, BinaryOperator) partitioned transform}
* {@link #partitionedTransform(PartitionedTable, BinaryOperator, Dependency...) partitioned transform}
* @param sanityCheckJoinOperations Whether to check that proxied join operations will only find a given join key in
* one constituent table for {@code this} and the {@link Table table} argument if it is also a
* {@link PartitionedTable.Proxy proxy}
Expand Down Expand Up @@ -211,17 +213,25 @@ default Proxy proxy() {
* PartitionedTable's {@link #table() underlying table} is refreshing.
* <p>
*
* @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for an
* empty input table. It is required to install an ExecutionContext to access any
* QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}.
*
* @param transformer The {@link UnaryOperator} to apply to all constituent {@link Table tables}
* @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added
* or modified constituents during update processing; use this when {@code transformer} uses additional
* {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this}
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if {@code !table().isRefreshing()} and
* {@code transformer} produces a refreshing result for any constituent
* @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for an
* empty input table. It is required to install an ExecutionContext to access any
* QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}.
*/
default PartitionedTable transform(@NotNull UnaryOperator<Table> transformer) {
return transform(ExecutionContext.getContextToRecord(), transformer, table().isRefreshing());
default PartitionedTable transform(
@NotNull final UnaryOperator<Table> transformer,
@NotNull final Dependency... dependencies) {
return transform(
ExecutionContext.getContextToRecord(),
transformer,
table().isRefreshing(),
dependencies);
}

/**
Expand All @@ -239,6 +249,9 @@ default PartitionedTable transform(@NotNull UnaryOperator<Table> transformer) {
* backed by a refreshing {@link #table() table}. This hint is important for transforms to static inputs that
* might produce refreshing output, in order to ensure correct liveness management; incorrectly specifying
* {@code false} will result in exceptions.
* @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added
* or modified constituents during update processing; use this when {@code transformer} uses additional
* {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this}
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if
* {@code !table().isRefreshing() && !expectRefreshingResults} and {@code transformer} produces a refreshing
Expand All @@ -247,7 +260,8 @@ default PartitionedTable transform(@NotNull UnaryOperator<Table> transformer) {
PartitionedTable transform(
@Nullable ExecutionContext executionContext,
@NotNull UnaryOperator<Table> transformer,
boolean expectRefreshingResults);
boolean expectRefreshingResults,
@NotNull Dependency... dependencies);

/**
* <p>
Expand All @@ -270,22 +284,30 @@ PartitionedTable transform(
* {@code other} has a refreshing {@link #table() underlying table}.
* <p>
*
* @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for
* empty input tables. It is required to install an ExecutionContext to access any
* QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}.
*
* @param other The other PartitionedTable to find constituents in
* @param transformer The {@link BinaryOperator} to apply to all pairs of constituent {@link Table tables}
* @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added,
* modified, or newly-matched constituents during update processing; use this when {@code transformer} uses
* additional {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this} or
* {@code other}
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if
* {@code !table().isRefreshing() && !other.table().isRefreshing()} and {@code transformer} produces a
* refreshing result for any constituent
* @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for
* empty input tables. It is required to install an ExecutionContext to access any
* QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}.
*/
default PartitionedTable partitionedTransform(
@NotNull PartitionedTable other,
@NotNull BinaryOperator<Table> transformer) {
return partitionedTransform(other, ExecutionContext.getContextToRecord(), transformer,
table().isRefreshing() || other.table().isRefreshing());
@NotNull final PartitionedTable other,
@NotNull final BinaryOperator<Table> transformer,
@NotNull final Dependency... dependencies) {
return partitionedTransform(
other,
ExecutionContext.getContextToRecord(),
transformer,
table().isRefreshing() || other.table().isRefreshing(),
dependencies);
}

/**
Expand Down Expand Up @@ -317,16 +339,22 @@ default PartitionedTable partitionedTransform(
* backed by a refreshing {@link #table() table}. This hint is important for transforms to static inputs that
* might produce refreshing output, in order to ensure correct liveness management; incorrectly specifying
* {@code false} will result in exceptions.
* @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added,
* modified, or newly-matched constituents during update processing; use this when {@code transformer} uses
* additional {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this} or
* {@code other}
* @return The new PartitionedTable containing the resulting constituents
* @throws IllegalStateException On instantiation or update if
* {@code !table().isRefreshing() && !other.table().isRefreshing() && !expectRefreshingResults} and
* {@code transformer} produces a refreshing result for any constituent
* {@code !table().isRefreshing() && !other.table().isRefreshing() &&
* !expectRefreshingResults} and {@code transformer} produces a refreshing result for
* any constituent
*/
PartitionedTable partitionedTransform(
@NotNull PartitionedTable other,
@Nullable ExecutionContext executionContext,
@NotNull BinaryOperator<Table> transformer,
boolean expectRefreshingResults);
boolean expectRefreshingResults,
@NotNull Dependency... dependencies);

/**
* <p>
Expand All @@ -343,10 +371,10 @@ PartitionedTable partitionedTransform(
* times.
*
* @param keyColumnValues Ordered, boxed values for the key columns in the same order as {@link #keyColumnNames()}
* @throws IllegalArgumentException If {@code keyColumnValues.length != keyColumnNames().size()}
* @throws UnsupportedOperationException If multiple matching rows for the {@code keyColumnValues} were found
* @return The {@link Table constituent} at the single row in {@link #table()} matching the {@code keyColumnValues},
* or {@code null} if no matches were found
* @throws IllegalArgumentException If {@code keyColumnValues.length != keyColumnNames().size()}
* @throws UnsupportedOperationException If multiple matching rows for the {@code keyColumnValues} were found
*/
@ConcurrentMethod
Table constituentFor(@NotNull Object... keyColumnValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import java.util.Collection;
import java.util.ServiceLoader;
import java.util.function.BinaryOperator;
import java.util.function.UnaryOperator;

/**
* Factory for producing Deephaven engine {@link PartitionedTable} instances.
Expand Down Expand Up @@ -73,9 +71,9 @@ private static Creator partitionedTableCreator() {
* @param table The "raw" {@link Table table} of {@link Table tables}. Should be {@link Table#isRefreshing()
* refreshing} if any constituents are.
* @param keyColumnNames The "key" column names from {@code table}. Key columns are used in
* {@link PartitionedTable#transform(UnaryOperator)} to validate the safety and correctness of join
* operations and in {@link PartitionedTable#partitionedTransform(PartitionedTable, BinaryOperator)} to
* correlate tables that should be transformed together. Passing an ordered set is highly recommended.
* {@link PartitionedTable#transform transform} to validate the safety and correctness of join operations and
* in {@link PartitionedTable#partitionedTransform partitionedTransform} to correlate tables that should be
* transformed together. Passing an ordered set is highly recommended.
* @param uniqueKeys Whether the keys (key column values for a row considered as a tuple) in {@code table} are
* guaranteed to be unique
* @param constituentColumnName The "constituent" column name from {@code table}. The constituent column contains
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,10 @@ private static OperatorAggregationStateManager makeInitializedStateManager(
outputPosition.setValue(0);
final OperatorAggregationStateManager stateManager = stateManagerSupplier.get();

if (initialKeys.isEmpty()) {
return stateManager;
}

final ColumnSource<?>[] keyColumnsToInsert;
final boolean closeRowsToInsert;
final RowSequence rowsToInsert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Base {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#transform} and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.function.BinaryOperator;

/**
* {@link SelectColumn} implementation to wrap transformer functions for
* {@link PartitionedTable#partitionedTransform(PartitionedTable, BinaryOperator) partitioned transformations}.
* {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#partitionedTransform
* partitioned transformations}.
*/
class BiTableTransformationColumn extends BaseTableTransformationColumn {

Expand Down
Loading

0 comments on commit 0974068

Please sign in to comment.