diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java index bb4990d8f6f..d9694d24573 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/FunctionalColumn.java @@ -176,7 +176,7 @@ public void fillPrevChunk(@NotNull FillContext fillContext, @NotNull final WritableChunk destination, @NotNull final RowSequence rowSequence) { final FunctionalColumnFillContext ctx = (FunctionalColumnFillContext) fillContext; - ctx.chunkFiller.fillByIndices(this, rowSequence, destination); + ctx.chunkFiller.fillPrevByIndices(this, rowSequence, destination); } }, false); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java index 377b8402014..eacc1f7c725 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/MultiSourceFunctionalColumn.java @@ -172,7 +172,7 @@ public void fillPrevChunk(@NotNull FillContext fillContext, @NotNull final WritableChunk destination, @NotNull final RowSequence rowSequence) { final FunctionalColumnFillContext ctx = (FunctionalColumnFillContext) fillContext; - ctx.chunkFiller.fillByIndices(this, rowSequence, destination); + ctx.chunkFiller.fillPrevByIndices(this, rowSequence, destination); } }, false); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java index 247018ec15b..171ff53aed3 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java @@ -5,13 +5,31 @@ import io.deephaven.base.SleepUtil; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.select.FunctionalColumn; import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.updategraph.LogicalClock; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.engine.util.TableTools; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.NamingThreadFactory; import org.apache.commons.lang3.mutable.MutableLong; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static io.deephaven.engine.testutil.TstUtils.addToTable; +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.engine.testutil.TstUtils.i; +import static io.deephaven.engine.testutil.TstUtils.testRefreshingTable; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.stringCol; + public class TestConstructSnapshot extends RefreshingTableTestCase { public void testClockChange() throws InterruptedException { @@ -64,4 +82,74 @@ public UpdateGraph getUpdateGraph() { t2.join(); assertEquals(1, changed.longValue()); } + + public void testUsePrevSnapshot() throws ExecutionException, InterruptedException { + final ExecutorService executor = Executors.newSingleThreadExecutor( + new NamingThreadFactory(TestConstructSnapshot.class, "TestConstructSnapshot Executor")); + + final QueryTable table = testRefreshingTable(i(1000).toTracking(), intCol("I", 10)); + FunctionalColumn plusOneColumn = + new FunctionalColumn<>("I", Integer.class, "S2", String.class, (Integer i) -> Integer.toString(i + 1)); + final QueryTable functionalTable = (QueryTable) table.updateView(List.of(plusOneColumn)); + + final BitSet oneBit = new BitSet(); + oneBit.set(0); + final BitSet twoBits = new BitSet(); + twoBits.set(0, 2); + + final InitialSnapshot initialSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10)); + final InitialSnapshot funcSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10)); + + final InitialSnapshotTable initialSnapshotTable = + InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot); + final InitialSnapshotTable funcSnapshotTable = + InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot); + + assertTableEquals(TableTools.newTable(intCol("I", 10)), initialSnapshotTable); + assertTableEquals(TableTools.newTable(intCol("I", 10), stringCol("S2", "11")), funcSnapshotTable); + + final ControlledUpdateGraph ug = ExecutionContext.getContext().getUpdateGraph().cast(); + + ug.startCycleForUnitTests(false); + addToTable(table, i(1000), intCol("I", 20)); + final InitialSnapshot initialSnapshot2 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10))).get(); + final InitialSnapshot funcSnapshot2 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); + table.notifyListeners(i(), i(), i(1000)); + ug.markSourcesRefreshedForUnitTests(); + + // noinspection StatementWithEmptyBody + while (ug.flushOneNotificationForUnitTests()); + + final InitialSnapshot initialSnapshot3 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10))).get(); + final InitialSnapshot funcSnapshot3 = + executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); + ug.completeCycleForUnitTests(); + + final InitialSnapshotTable initialSnapshotTable2 = + InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot2); + final InitialSnapshotTable initialSnapshotTable3 = + InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot3); + + assertTableEquals(TableTools.newTable(intCol("I", 10)), initialSnapshotTable2); + assertTableEquals(TableTools.newTable(intCol("I", 20)), initialSnapshotTable3); + + final InitialSnapshotTable funcSnapshotTable2 = + InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot2); + final InitialSnapshotTable funcSnapshotTable3 = + InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot3); + + assertTableEquals(TableTools.newTable(intCol("I", 10), stringCol("S2", "11")), funcSnapshotTable2); + assertTableEquals(TableTools.newTable(intCol("I", 20), stringCol("S2", "21")), funcSnapshotTable3); + + executor.shutdownNow(); + } }