From df0aa7f9545db350f1c33580ad9fa7b6940891ea Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 26 Jul 2023 15:55:58 -0700 Subject: [PATCH 1/2] Port of DH-13185: FunctionalColumn fillPrevChunk was looking at current instead of previous --- .../table/impl/select/FunctionalColumn.java | 2 +- .../select/MultiSourceFunctionalColumn.java | 2 +- .../impl/remote/TestConstructSnapshot.java | 88 +++++++++++++++++++ 3 files changed, 90 insertions(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java index bb4990d8f6f..d9694d24573 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java @@ -176,7 +176,7 @@ public void fillPrevChunk(@NotNull FillContext fillContext, @NotNull final WritableChunk destination, @NotNull final RowSequence rowSequence) { final FunctionalColumnFillContext ctx = (FunctionalColumnFillContext) fillContext; - ctx.chunkFiller.fillByIndices(this, rowSequence, destination); + ctx.chunkFiller.fillPrevByIndices(this, rowSequence, destination); } }, false); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java index 377b8402014..eacc1f7c725 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java @@ -172,7 +172,7 @@ public void fillPrevChunk(@NotNull FillContext fillContext, @NotNull final WritableChunk destination, @NotNull final RowSequence rowSequence) { final FunctionalColumnFillContext ctx = (FunctionalColumnFillContext) fillContext; - ctx.chunkFiller.fillByIndices(this, rowSequence, destination); + ctx.chunkFiller.fillPrevByIndices(this, rowSequence, destination); } }, false); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java index 247018ec15b..8ba61be1913 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java @@ -5,13 +5,31 @@ import io.deephaven.base.SleepUtil; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.select.FunctionalColumn; import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.engine.util.TableTools; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.NamingThreadFactory; import org.apache.commons.lang3.mutable.MutableLong; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static io.deephaven.engine.testutil.TstUtils.addToTable; +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.engine.testutil.TstUtils.i; +import static io.deephaven.engine.testutil.TstUtils.testRefreshingTable; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.stringCol; + public class TestConstructSnapshot extends RefreshingTableTestCase { public void testClockChange() throws InterruptedException { @@ -64,4 +82,74 @@ public UpdateGraph getUpdateGraph() { t2.join(); assertEquals(1, changed.longValue()); } + + public void testUsePrevSnapshot() throws ExecutionException, InterruptedException { + final ExecutorService executor = Executors.newSingleThreadExecutor( + new NamingThreadFactory(TestConstructSnapshot.class, "TestConstructSnapshot Executor")); + + final QueryTable table = testRefreshingTable(i(1000).toTracking(), intCol("I", 10)); + FunctionalColumn plusOneColumn = + new FunctionalColumn<>("I", Integer.class, "S2", String.class, (Integer i) -> Integer.toString(i + 1)); + final QueryTable functionalTable = (QueryTable) table.updateView(List.of(plusOneColumn)); + + TableTools.show(functionalTable); + + final BitSet oneBit = new BitSet(); + oneBit.set(0); + final BitSet twoBits = new BitSet(); + twoBits.set(0); + twoBits.set(1); + final InitialSnapshot initialSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace(table, table, + oneBit, RowSetFactory.fromRange(0, 10)); + final InitialSnapshot funcSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace(functionalTable, + functionalTable, twoBits, RowSetFactory.fromRange(0, 10)); + + final InitialSnapshotTable initialSnapshotTable = + InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot); + TableTools.showWithRowSet(initialSnapshotTable); + + final InitialSnapshotTable funcSnapshotTable = + InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot); + TableTools.showWithRowSet(funcSnapshotTable); + + assertTableEquals(TableTools.newTable(intCol("I", 10)), initialSnapshotTable); + assertTableEquals(TableTools.newTable(intCol("I", 10), stringCol("S2", "11")), funcSnapshotTable); + + ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + addToTable(table, i(1000), intCol("I", 20)); + final InitialSnapshot initialSnapshot2 = executor.submit(() -> ConstructSnapshot + .constructInitialSnapshotInPositionSpace(table, table, oneBit, RowSetFactory.fromRange(0, 10))).get(); + final InitialSnapshot funcSnapshot2 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace(functionalTable, + functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); + table.notifyListeners(i(), i(), i(1000)); + + while (ExecutionContext.getContext().getUpdateGraph().cast() + .flushOneNotificationForUnitTests()); + + final InitialSnapshot initialSnapshot3 = executor.submit(() -> ConstructSnapshot + .constructInitialSnapshotInPositionSpace(table, table, oneBit, RowSetFactory.fromRange(0, 10))).get(); + final InitialSnapshot funcSnapshot3 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace(functionalTable, + functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); + ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + + final InitialSnapshotTable initialSnapshotTable2 = + InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot2); + final InitialSnapshotTable initialSnapshotTable3 = + InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot3); + + assertTableEquals(TableTools.newTable(intCol("I", 10)), initialSnapshotTable2); + assertTableEquals(TableTools.newTable(intCol("I", 20)), initialSnapshotTable3); + + final InitialSnapshotTable funcSnapshotTable2 = + InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot2); + final InitialSnapshotTable funcSnapshotTable3 = + InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot3); + + assertTableEquals(TableTools.newTable(intCol("I", 10), stringCol("S2", "11")), funcSnapshotTable2); + assertTableEquals(TableTools.newTable(intCol("I", 20), stringCol("S2", "21")), funcSnapshotTable3); + + executor.shutdownNow(); + } } From 57890a6f4ae6d2b9fdb7590e7d704d5a72bbc5e7 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Wed, 26 Jul 2023 22:12:15 -0400 Subject: [PATCH 2/2] Clean up and fix TestConstructSnapshot.testUsePrevSnapshot() --- .../impl/remote/TestConstructSnapshot.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java index 8ba61be1913..171ff53aed3 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java @@ -92,47 +92,47 @@ public void testUsePrevSnapshot() throws ExecutionException, InterruptedExceptio new FunctionalColumn<>("I", Integer.class, "S2", String.class, (Integer i) -> Integer.toString(i + 1)); final QueryTable functionalTable = (QueryTable) table.updateView(List.of(plusOneColumn)); - TableTools.show(functionalTable); - final BitSet oneBit = new BitSet(); oneBit.set(0); final BitSet twoBits = new BitSet(); - twoBits.set(0); - twoBits.set(1); - final InitialSnapshot initialSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace(table, table, - oneBit, RowSetFactory.fromRange(0, 10)); - final InitialSnapshot funcSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace(functionalTable, - functionalTable, twoBits, RowSetFactory.fromRange(0, 10)); + twoBits.set(0, 2); + + final InitialSnapshot initialSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10)); + final InitialSnapshot funcSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10)); final InitialSnapshotTable initialSnapshotTable = InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot); - TableTools.showWithRowSet(initialSnapshotTable); - final InitialSnapshotTable funcSnapshotTable = InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot); - TableTools.showWithRowSet(funcSnapshotTable); assertTableEquals(TableTools.newTable(intCol("I", 10)), initialSnapshotTable); assertTableEquals(TableTools.newTable(intCol("I", 10), stringCol("S2", "11")), funcSnapshotTable); - ExecutionContext.getContext().getUpdateGraph().cast().startCycleForUnitTests(); + final ControlledUpdateGraph ug = ExecutionContext.getContext().getUpdateGraph().cast(); + + ug.startCycleForUnitTests(false); addToTable(table, i(1000), intCol("I", 20)); - final InitialSnapshot initialSnapshot2 = executor.submit(() -> ConstructSnapshot - .constructInitialSnapshotInPositionSpace(table, table, oneBit, RowSetFactory.fromRange(0, 10))).get(); + final InitialSnapshot initialSnapshot2 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10))).get(); final InitialSnapshot funcSnapshot2 = - executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace(functionalTable, - functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); table.notifyListeners(i(), i(), i(1000)); + ug.markSourcesRefreshedForUnitTests(); - while (ExecutionContext.getContext().getUpdateGraph().cast() - .flushOneNotificationForUnitTests()); + // noinspection StatementWithEmptyBody + while (ug.flushOneNotificationForUnitTests()); - final InitialSnapshot initialSnapshot3 = executor.submit(() -> ConstructSnapshot - .constructInitialSnapshotInPositionSpace(table, table, oneBit, RowSetFactory.fromRange(0, 10))).get(); + final InitialSnapshot initialSnapshot3 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10))).get(); final InitialSnapshot funcSnapshot3 = - executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace(functionalTable, - functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); - ExecutionContext.getContext().getUpdateGraph().cast().completeCycleForUnitTests(); + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); + ug.completeCycleForUnitTests(); final InitialSnapshotTable initialSnapshotTable2 = InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot2);