From bbb47172a71ed0a75e9186d7540cc57c3789dca2 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 08:45:40 -0400 Subject: [PATCH 01/20] Update unit test to use a better interface. --- .../table/impl/select/TimeSeriesFilter.java | 31 ++++++-- .../engine/table/impl/util/TestClock.java | 73 +++++++++++++++++++ .../table/impl/util/TestTimeSeriesFilter.java | 68 +++++------------ 3 files changed, 114 insertions(+), 58 deletions(-) create mode 100644 engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 497957f4e45..755f291f469 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -17,10 +17,12 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.table.ColumnSource; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Objects; /** * This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column). @@ -33,19 +35,31 @@ public class TimeSeriesFilter extends WhereFilterLivenessArtifactImpl implements Runnable, NotificationQueue.Dependency { - protected final String columnName; - protected final long nanos; + private final String columnName; + private final long nanos; + private final Clock clock; + private RecomputeListener listener; @SuppressWarnings("UnusedDeclaration") - public TimeSeriesFilter(String columnName, String period) { + public TimeSeriesFilter(final String columnName, + final String period) { this(columnName, DateTimeUtils.parseDurationNanos(period)); } - public TimeSeriesFilter(String columnName, long nanos) { - Require.gtZero(nanos, "nanos"); + // TODO: invert + public TimeSeriesFilter(final String columnName, + final long nanos) { + this(columnName, nanos, null); + } + + public TimeSeriesFilter(final String columnName, + final long periodNanos, + @Nullable final Clock clock) { + Require.gtZero(periodNanos, "periodNanos"); this.columnName = columnName; - this.nanos = nanos; + this.nanos = periodNanos; + this.clock = clock; } @Override @@ -72,6 +86,7 @@ public WritableRowSet filter( throw new PreviousFilteringNotSupported(); } + // TODO: reinterpret this appropriately, or maybe delegate to the window checking stuff ColumnSource dateColumn = table.getColumnSource(columnName); if (!Instant.class.isAssignableFrom(dateColumn.getType())) { throw new RuntimeException(columnName + " is not an Instant column!"); @@ -93,7 +108,7 @@ public WritableRowSet filter( } protected long getNowNanos() { - return Clock.system().currentTimeNanos(); + return Objects.requireNonNullElseGet(clock, DateTimeUtils::currentClock).currentTimeNanos(); } @Override @@ -122,7 +137,7 @@ public UpdateGraph getUpdateGraph() { @Override public TimeSeriesFilter copy() { - return new TimeSeriesFilter(columnName, nanos); + return new TimeSeriesFilter(columnName, nanos, clock); } @Override diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java new file mode 100644 index 00000000000..73695570e8e --- /dev/null +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java @@ -0,0 +1,73 @@ +package io.deephaven.engine.table.impl.util; + +import io.deephaven.base.clock.Clock; +import io.deephaven.time.DateTimeUtils; + +import java.time.Instant; + +/** + * A clock that has a fixed time for use in unit tests. + */ +public class TestClock implements Clock { + private long nanos; + + /** + * Set the time. + * @param epochNanos the time to set in nanos since the epoch + * @return this clock + */ + public TestClock setNanos(long epochNanos) { + this.nanos = epochNanos; + return this; + } + + /** + * Set the time. + * @param epochMillis the time to set in millis since the epoch + * @return this clock + */ + public TestClock setMillis(long epochMillis) { + this.nanos = DateTimeUtils.millisToNanos(epochMillis); + return this; + } + + /** + * Add millis to the current time. + * @param millis the number of millis to add + * @return this clock + */ + public TestClock addMillis(long millis) { + this.nanos += DateTimeUtils.millisToNanos(millis); + return this; + } + + @Override + public long currentTimeMillis() { + return DateTimeUtils.nanosToMillis(nanos); + } + + @Override + public long currentTimeMicros() { + return DateTimeUtils.nanosToMicros(nanos); + } + + @Override + public long currentTimeNanos() { + return nanos; + } + + @Override + public Instant instantNanos() { + return DateTimeUtils.epochNanosToInstant(nanos); + } + + @Override + public Instant instantMillis() { + return instantNanos(); + } + + @Override + public String toString() { + return "TestClock{" + DateTimeUtils.epochNanosToInstant(nanos) + "}"; + } +} diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestTimeSeriesFilter.java index f71f8b3c01c..15fe75e7e92 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestTimeSeriesFilter.java @@ -40,7 +40,9 @@ public void testSimple() { Table source = TableTools.newTable(TableTools.col("Timestamp", times)); TableTools.show(source); - UnitTestTimeSeriesFilter timeSeriesFilter = new UnitTestTimeSeriesFilter(startTime, "Timestamp", "PT00:00:05"); + final TestClock testClock = new TestClock().setMillis(startTime); + + final TimeSeriesFilter timeSeriesFilter = new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), testClock); Table filtered = source.where(timeSeriesFilter); TableTools.show(filtered); @@ -48,7 +50,7 @@ public void testSimple() { final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> { - timeSeriesFilter.incrementNow(5000); + testClock.addMillis(5000); timeSeriesFilter.run(); }); @@ -56,15 +58,17 @@ public void testSimple() { assertEquals(10, filtered.size()); updateGraph.runWithinUnitTestCycle(() -> { - timeSeriesFilter.incrementNow(5000); + testClock.addMillis(5000); timeSeriesFilter.run(); }); + System.out.println(testClock); + TableTools.show(filtered); assertEquals(5, filtered.size()); updateGraph.runWithinUnitTestCycle(() -> { - timeSeriesFilter.incrementNow(2000); + testClock.addMillis(2000); timeSeriesFilter.run(); }); @@ -85,15 +89,16 @@ public void testIncremental() throws ParseException { new DateGenerator(startDate, endDate), new IntGenerator(1, 100))); - final UnitTestTimeSeriesFilter unitTestTimeSeriesFilter = - new UnitTestTimeSeriesFilter(startDate.getTime(), "Date", "PT01:00:00"); - final ArrayList> filtersToRefresh = new ArrayList<>(); + final TestClock testClock = new TestClock().setMillis(startDate.getTime()); + + final TimeSeriesFilter unitTestTimeSeriesFilter = + new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), testClock); + final ArrayList> filtersToRefresh = new ArrayList<>(); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); EvalNugget[] en = new EvalNugget[] { EvalNugget.from(() -> { - UnitTestTimeSeriesFilter unitTestTimeSeriesFilter1 = - new UnitTestTimeSeriesFilter(unitTestTimeSeriesFilter); + TimeSeriesFilter unitTestTimeSeriesFilter1 = unitTestTimeSeriesFilter.copy(); filtersToRefresh.add(new WeakReference<>(unitTestTimeSeriesFilter1)); return updateGraph.exclusiveLock().computeLocked( () -> table.update("Date=DateTimeUtils.epochNanosToInstant(Date.getTime() * 1000000L)") @@ -108,15 +113,14 @@ public void testIncremental() throws ParseException { simulateShiftAwareStep(size, random, table, columnInfo, en); } else { updateGraph.runWithinUnitTestCycle(() -> { - unitTestTimeSeriesFilter.incrementNow(3600 * 1000); + testClock.addMillis(3600 * 1000); - final ArrayList> collectedRefs = new ArrayList<>(); - for (WeakReference ref : filtersToRefresh) { - final UnitTestTimeSeriesFilter refreshFilter = ref.get(); + final ArrayList> collectedRefs = new ArrayList<>(); + for (WeakReference ref : filtersToRefresh) { + final TimeSeriesFilter refreshFilter = ref.get(); if (refreshFilter == null) { collectedRefs.add(ref); } else { - refreshFilter.setNow(unitTestTimeSeriesFilter.getNowLong()); refreshFilter.run(); } } @@ -126,40 +130,4 @@ public void testIncremental() throws ParseException { } } } - - private static class UnitTestTimeSeriesFilter extends TimeSeriesFilter { - long now; - - public UnitTestTimeSeriesFilter(long startTime, String timestamp, String period) { - super(timestamp, period); - now = startTime; - } - - public UnitTestTimeSeriesFilter(long startTime, String timestamp, long period) { - super(timestamp, period); - now = startTime; - } - - public UnitTestTimeSeriesFilter(UnitTestTimeSeriesFilter other) { - this(other.now, other.columnName, other.nanos); - } - - @Override - protected long getNowNanos() { - return now * 1000000L; - } - - long getNowLong() { - return now; - } - - void incrementNow(long increment) { - now += increment; - } - - void setNow(long newValue) { - now = newValue; - } - } - } From 2f84ab5d6954b4344e458d35c770d7934e670cf9 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 11:19:45 -0400 Subject: [PATCH 02/20] use nested window check. --- .../table/impl/select/TimeSeriesFilter.java | 142 +++++++++++++----- .../io/deephaven/engine/util/WindowCheck.java | 90 ++++++++--- .../TestTimeSeriesFilter.java | 21 ++- 3 files changed, 185 insertions(+), 68 deletions(-) rename engine/table/src/test/java/io/deephaven/engine/table/impl/{util => select}/TestTimeSeriesFilter.java (88%) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 755f291f469..32baea75b38 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -3,26 +3,32 @@ // package io.deephaven.engine.table.impl.select; +import io.deephaven.base.Pair; import io.deephaven.base.clock.Clock; import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.Require; +import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetBuilderSequential; -import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.TableUpdate; +import io.deephaven.engine.table.impl.ListenerRecorder; +import io.deephaven.engine.table.impl.MergedListener; +import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.engine.util.WindowCheck; import io.deephaven.time.DateTimeUtils; -import io.deephaven.engine.table.ColumnSource; +import io.deephaven.util.annotations.TestUseOnly; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.List; -import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; /** * This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column). @@ -34,32 +40,54 @@ */ public class TimeSeriesFilter extends WhereFilterLivenessArtifactImpl - implements Runnable, NotificationQueue.Dependency { + implements NotificationQueue.Dependency { private final String columnName; - private final long nanos; + private final long periodNanos; + private final boolean invert; private final Clock clock; private RecomputeListener listener; + // this table contains our window + private QueryTable tableWithWindow; + private String windowSourceName; + private Runnable refreshFunctionForUnitTests; + + /** + * We create the merged listener so that we can participate in dependency resolution, but the dependencies are not + * actually known until the first call to filter(). + */ + private final List windowListenerRecorder = new CopyOnWriteArrayList<>(); + private final List windowDependency = new CopyOnWriteArrayList<>(); + private final TimeSeriesFilterMergedListener mergedListener; @SuppressWarnings("UnusedDeclaration") public TimeSeriesFilter(final String columnName, - final String period) { + final String period) { this(columnName, DateTimeUtils.parseDurationNanos(period)); } - // TODO: invert public TimeSeriesFilter(final String columnName, - final long nanos) { - this(columnName, nanos, null); + final long nanos) { + this(columnName, nanos, false); + } + + public TimeSeriesFilter(final String columnName, + final long nanos, + final boolean invert) { + this(columnName, nanos, invert, null); } public TimeSeriesFilter(final String columnName, - final long periodNanos, - @Nullable final Clock clock) { + final long periodNanos, + final boolean invert, + @Nullable final Clock clock) { Require.gtZero(periodNanos, "periodNanos"); this.columnName = columnName; - this.nanos = periodNanos; + this.periodNanos = periodNanos; + this.invert = invert; this.clock = clock; + this.mergedListener = new TimeSeriesFilterMergedListener( + "TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")"); } @Override @@ -77,7 +105,7 @@ public void init(@NotNull final TableDefinition tableDefinition) {} @NotNull @Override - public WritableRowSet filter( + public synchronized WritableRowSet filter( @NotNull final RowSet selection, @NotNull final RowSet fullSet, @NotNull final Table table, @@ -86,29 +114,28 @@ public WritableRowSet filter( throw new PreviousFilteringNotSupported(); } - // TODO: reinterpret this appropriately, or maybe delegate to the window checking stuff - ColumnSource dateColumn = table.getColumnSource(columnName); - if (!Instant.class.isAssignableFrom(dateColumn.getType())) { - throw new RuntimeException(columnName + " is not an Instant column!"); - } + if (tableWithWindow == null) { + windowSourceName = "__Window_" + columnName; + while (table.getDefinition().getColumnNames().contains(windowSourceName)) { + windowSourceName = "_" + windowSourceName; + } + final Pair pair = WindowCheck.addTimeWindowInternal(clock, (QueryTable) table, columnName, periodNanos + 1, windowSourceName, true); + tableWithWindow = (QueryTable)pair.first; + refreshFunctionForUnitTests = pair.second; - long nanoBoundary = getNowNanos() - nanos; + manage(tableWithWindow); + windowDependency.add(tableWithWindow); + final ListenerRecorder recorder = new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null); + tableWithWindow.addUpdateListener(recorder); + recorder.setMergedListener(mergedListener); - RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential(); - for (RowSet.Iterator it = selection.iterator(); it.hasNext();) { - long row = it.nextLong(); - Instant instant = dateColumn.get(row); - long nanoValue = DateTimeUtils.epochNanos(instant); - if (nanoValue >= nanoBoundary) { - indexBuilder.appendKey(row); - } - } + windowListenerRecorder.add(recorder); - return indexBuilder.build(); - } + // we are doing the first match, which is based on the entire set of values in the table + mergedListener.insertMatched(fullSet); + } - protected long getNowNanos() { - return Objects.requireNonNullElseGet(clock, DateTimeUtils::currentClock).currentTimeNanos(); + return selection.intersect(mergedListener.inWindowRowset); } @Override @@ -122,7 +149,6 @@ public void setRecomputeListener(RecomputeListener listener) { Assert.eqNull(this.listener, "this.listener"); this.listener = listener; listener.setIsRefreshing(true); - updateGraph.addSource(this); } @Override @@ -137,7 +163,7 @@ public UpdateGraph getUpdateGraph() { @Override public TimeSeriesFilter copy() { - return new TimeSeriesFilter(columnName, nanos, clock); + return new TimeSeriesFilter(columnName, periodNanos, invert, clock); } @Override @@ -146,13 +172,47 @@ public boolean isRefreshing() { } @Override - public void run() { - listener.requestRecomputeMatched(); + public boolean permitParallelization() { + return false; } - @Override - protected void destroy() { - super.destroy(); - updateGraph.removeSource(this); + private class TimeSeriesFilterMergedListener extends MergedListener { + final WritableRowSet inWindowRowset = RowSetFactory.empty(); + + protected TimeSeriesFilterMergedListener(String listenerDescription) { + super(windowListenerRecorder, windowDependency, listenerDescription, null); + } + + @Override + protected void process() { + synchronized (TimeSeriesFilter.this) { + final TableUpdate update = windowListenerRecorder.get(0).getUpdate().acquire(); + + inWindowRowset.remove(update.removed()); + if (update.modifiedColumnSet().containsAll(tableWithWindow.newModifiedColumnSet(windowSourceName))) { + // we need to check on the modified rows; they may be in the window, + inWindowRowset.remove(update.getModifiedPreShift()); + insertMatched(update.modified()); + } + insertMatched(update.added()); + + if (invert) { + listener.requestRecomputeUnmatched(); + } else { + listener.requestRecomputeMatched(); + } + } + } + + private void insertMatched(final RowSet rowSet) { + try (final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, null, rowSet, Boolean.TRUE)) { + inWindowRowset.insert(matched); + } + } + } + + @TestUseOnly + void runForUnitTests() { + refreshFunctionForUnitTests.run(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java index 278814c44b0..e77f77c4d5b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java @@ -15,11 +15,7 @@ import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; -import io.deephaven.engine.table.ChunkSource; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.SharedContext; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableUpdate; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; @@ -29,14 +25,15 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.table.impl.*; import io.deephaven.engine.table.impl.AbstractColumnSource; -import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.base.RAPriQueue; import io.deephaven.util.QueryConstants; +import io.deephaven.util.annotations.InternalUseOnly; import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.*; @@ -92,7 +89,8 @@ private WindowListenerRecorder(Table parent, BaseTable dependent) { * @param addToMonitor should we add this to the PeriodicUpdateGraph * @return a pair of the result table and the TimeWindowListener that drives it */ - static Pair addTimeWindowInternal(Clock clock, QueryTable table, + @InternalUseOnly + public static Pair addTimeWindowInternal(Clock clock, QueryTable table, String timestampColumn, long windowNanos, String inWindowColumn, boolean addToMonitor) { if (table.isRefreshing()) { table.getUpdateGraph().checkInitiateSerialTableOperation(); @@ -111,19 +109,28 @@ static Pair addTimeWindowInternal(Clock clock, QueryT final QueryTable result = new QueryTable(table.getRowSet(), resultColumns); final WindowListenerRecorder recorder = new WindowListenerRecorder(table, result); - final TimeWindowListener timeWindowListener = - new TimeWindowListener(inWindowColumn, inWindowColumnSource, recorder, table, result); - recorder.setMergedListener(timeWindowListener); + final TimeWindowListenerImpl timeWindowListenerImpl = + new TimeWindowListenerImpl(inWindowColumn, inWindowColumnSource, recorder, table, result); + recorder.setMergedListener(timeWindowListenerImpl); if (table.isRefreshing()) { table.addUpdateListener(recorder); } - timeWindowListener.addRowSequence(table.getRowSet(), false); - result.addParentReference(timeWindowListener); + timeWindowListenerImpl.addRowSequence(table.getRowSet(), false); + result.addParentReference(timeWindowListenerImpl); result.manage(table); if (addToMonitor) { - result.getUpdateGraph().addSource(timeWindowListener); + result.getUpdateGraph().addSource(timeWindowListenerImpl); } - return new Pair<>(result, timeWindowListener); + return new Pair<>(result, timeWindowListenerImpl); + } + + /** + * This interface is only for Deephaven internal use. + */ + @InternalUseOnly + public interface TimeWindowListener extends Runnable { + void validateQueue(); + void dumpQueue(); } /** @@ -134,7 +141,7 @@ static Pair addTimeWindowInternal(Clock clock, QueryT * It implements {@link Runnable}, so that we can be inserted into the {@link PeriodicUpdateGraph}. *

*/ - static class TimeWindowListener extends MergedListener implements Runnable { + static class TimeWindowListenerImpl extends MergedListener implements TimeWindowListener { private final InWindowColumnSource inWindowColumnSource; private final QueryTable result; /** @@ -202,8 +209,8 @@ public String toString() { * @param source the source table * @param result our initialized result table */ - private TimeWindowListener(final String inWindowColumnName, final InWindowColumnSource inWindowColumnSource, - final ListenerRecorder recorder, final QueryTable source, final QueryTable result) { + private TimeWindowListenerImpl(final String inWindowColumnName, final InWindowColumnSource inWindowColumnSource, + final ListenerRecorder recorder, final QueryTable source, final QueryTable result) { super(Collections.singleton(recorder), Collections.singleton(source), "WindowCheck", result); this.source = source; this.recorder = recorder; @@ -802,7 +809,8 @@ private RowSet recomputeModified() { return builder.build(); } - void validateQueue() { + @Override + public void validateQueue() { final RowSet resultRowSet = result.getRowSet(); final RowSetBuilderRandom builder = RowSetFactory.builderRandom(); @@ -866,7 +874,8 @@ void validateQueue() { } } - void dumpQueue() { + @Override + public void dumpQueue() { final Entry[] entries = new Entry[priorityQueue.size()]; priorityQueue.dump(entries, 0); System.out.println("Queue size: " + entries.length); @@ -1048,5 +1057,48 @@ private long timeStampForPrev() { final long currentStep = updateGraph.clock().currentStep(); return (clockStep < currentStep || clockStep == initialStep) ? currentTime : prevTime; } + + @Override + public WritableRowSet match(boolean invertMatch, boolean usePrev, boolean caseInsensitive, @Nullable DataIndex dataIndex, @NotNull RowSet mapper, Object... keys) { + final boolean includeNull = Arrays.asList(keys).contains(null) ^ invertMatch; + final boolean includeTrue = Arrays.asList(keys).contains(true) ^ invertMatch; + final boolean includeFalse = Arrays.asList(keys).contains(false) ^ invertMatch; + + final int getSize = (int)Math.min(4096, mapper.size()); + + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); + + try (final GetContext getContext = timeStampSource.makeGetContext(getSize); + final RowSequence.Iterator rsit = mapper.getRowSequenceIterator()) { + while (rsit.hasMore()) { + final RowSequence chunkRs = rsit.getNextRowSequenceWithLength(getSize); + final LongChunk rowKeys = chunkRs.asRowKeyChunk(); + final LongChunk timeStamps; + if (usePrev) { + timeStamps = timeStampSource.getPrevChunk(getContext, chunkRs).asLongChunk(); + } else { + timeStamps = timeStampSource.getChunk(getContext, chunkRs).asLongChunk(); + } + for (int ii = 0; ii < rowKeys.size(); ++ii) { + final long rowKey = rowKeys.get(ii); + if (timeStamps.get(ii) == QueryConstants.NULL_LONG) { + if (includeNull) { + builder.appendKey(rowKey); + } + } else { + final boolean inWindow = computeInWindowUnsafe(timeStamps.get(ii), currentTime); + if (includeTrue && inWindow) { + builder.appendKey(rowKey); + } + if (includeFalse && !inWindow) { + builder.appendKey(rowKey); + } + } + } + } + } + + return builder.build(); + } } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java similarity index 88% rename from engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestTimeSeriesFilter.java rename to engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java index 15fe75e7e92..c0575924106 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java @@ -1,10 +1,11 @@ // // Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending // -package io.deephaven.engine.table.impl.util; +package io.deephaven.engine.table.impl.select; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.util.TestClock; import io.deephaven.engine.testutil.ColumnInfo; import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.generator.DateGenerator; @@ -14,7 +15,6 @@ import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.testutil.TstUtils; -import io.deephaven.engine.table.impl.select.TimeSeriesFilter; import io.deephaven.time.DateTimeUtils; import java.lang.ref.WeakReference; @@ -42,7 +42,7 @@ public void testSimple() { final TestClock testClock = new TestClock().setMillis(startTime); - final TimeSeriesFilter timeSeriesFilter = new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), testClock); + final TimeSeriesFilter timeSeriesFilter = new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), false, testClock); Table filtered = source.where(timeSeriesFilter); TableTools.show(filtered); @@ -51,7 +51,7 @@ public void testSimple() { final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); updateGraph.runWithinUnitTestCycle(() -> { testClock.addMillis(5000); - timeSeriesFilter.run(); + timeSeriesFilter.runForUnitTests(); }); TableTools.show(filtered); @@ -59,7 +59,7 @@ public void testSimple() { updateGraph.runWithinUnitTestCycle(() -> { testClock.addMillis(5000); - timeSeriesFilter.run(); + timeSeriesFilter.runForUnitTests(); }); System.out.println(testClock); @@ -69,7 +69,7 @@ public void testSimple() { updateGraph.runWithinUnitTestCycle(() -> { testClock.addMillis(2000); - timeSeriesFilter.run(); + timeSeriesFilter.runForUnitTests(); }); TableTools.show(filtered); @@ -92,7 +92,7 @@ public void testIncremental() throws ParseException { final TestClock testClock = new TestClock().setMillis(startDate.getTime()); final TimeSeriesFilter unitTestTimeSeriesFilter = - new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), testClock); + new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock); final ArrayList> filtersToRefresh = new ArrayList<>(); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); @@ -121,7 +121,7 @@ public void testIncremental() throws ParseException { if (refreshFilter == null) { collectedRefs.add(ref); } else { - refreshFilter.run(); + refreshFilter.runForUnitTests(); } } filtersToRefresh.removeAll(collectedRefs); @@ -130,4 +130,9 @@ public void testIncremental() throws ParseException { } } } + + // TODO: test in a sequence of filters, with a dynamic where filter in front of us + // TODO: test inverted filters + // TODO: test actual modifications and additions to the table + // TODO: test when nothing actually changes from the window check perspective } From 4b73c0d8420ec499d3280a4d4c7cdb5110928acc Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 11:21:50 -0400 Subject: [PATCH 03/20] Embed WindowCheck in time series filter. --- .../engine/table/impl/select/TimeSeriesFilter.java | 11 +++++++---- .../java/io/deephaven/engine/util/WindowCheck.java | 10 ++++++---- .../table/impl/select/TestTimeSeriesFilter.java | 3 ++- .../deephaven/engine/table/impl/util/TestClock.java | 6 ++++++ 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 32baea75b38..ca374c28799 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -119,13 +119,15 @@ public synchronized WritableRowSet filter( while (table.getDefinition().getColumnNames().contains(windowSourceName)) { windowSourceName = "_" + windowSourceName; } - final Pair pair = WindowCheck.addTimeWindowInternal(clock, (QueryTable) table, columnName, periodNanos + 1, windowSourceName, true); - tableWithWindow = (QueryTable)pair.first; + final Pair pair = WindowCheck.addTimeWindowInternal(clock, + (QueryTable) table, columnName, periodNanos + 1, windowSourceName, true); + tableWithWindow = (QueryTable) pair.first; refreshFunctionForUnitTests = pair.second; manage(tableWithWindow); windowDependency.add(tableWithWindow); - final ListenerRecorder recorder = new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null); + final ListenerRecorder recorder = + new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null); tableWithWindow.addUpdateListener(recorder); recorder.setMergedListener(mergedListener); @@ -205,7 +207,8 @@ protected void process() { } private void insertMatched(final RowSet rowSet) { - try (final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, null, rowSet, Boolean.TRUE)) { + try (final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, + null, rowSet, Boolean.TRUE)) { inWindowRowset.insert(matched); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java index e77f77c4d5b..1af89c56855 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java @@ -130,6 +130,7 @@ public static Pair addTimeWindowInternal(Clock clock, @InternalUseOnly public interface TimeWindowListener extends Runnable { void validateQueue(); + void dumpQueue(); } @@ -210,7 +211,7 @@ public String toString() { * @param result our initialized result table */ private TimeWindowListenerImpl(final String inWindowColumnName, final InWindowColumnSource inWindowColumnSource, - final ListenerRecorder recorder, final QueryTable source, final QueryTable result) { + final ListenerRecorder recorder, final QueryTable source, final QueryTable result) { super(Collections.singleton(recorder), Collections.singleton(source), "WindowCheck", result); this.source = source; this.recorder = recorder; @@ -1059,17 +1060,18 @@ private long timeStampForPrev() { } @Override - public WritableRowSet match(boolean invertMatch, boolean usePrev, boolean caseInsensitive, @Nullable DataIndex dataIndex, @NotNull RowSet mapper, Object... keys) { + public WritableRowSet match(boolean invertMatch, boolean usePrev, boolean caseInsensitive, + @Nullable DataIndex dataIndex, @NotNull RowSet mapper, Object... keys) { final boolean includeNull = Arrays.asList(keys).contains(null) ^ invertMatch; final boolean includeTrue = Arrays.asList(keys).contains(true) ^ invertMatch; final boolean includeFalse = Arrays.asList(keys).contains(false) ^ invertMatch; - final int getSize = (int)Math.min(4096, mapper.size()); + final int getSize = (int) Math.min(4096, mapper.size()); final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); try (final GetContext getContext = timeStampSource.makeGetContext(getSize); - final RowSequence.Iterator rsit = mapper.getRowSequenceIterator()) { + final RowSequence.Iterator rsit = mapper.getRowSequenceIterator()) { while (rsit.hasMore()) { final RowSequence chunkRs = rsit.getNextRowSequenceWithLength(getSize); final LongChunk rowKeys = chunkRs.asRowKeyChunk(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java index c0575924106..08f549ccad4 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java @@ -42,7 +42,8 @@ public void testSimple() { final TestClock testClock = new TestClock().setMillis(startTime); - final TimeSeriesFilter timeSeriesFilter = new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), false, testClock); + final TimeSeriesFilter timeSeriesFilter = + new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), false, testClock); Table filtered = source.where(timeSeriesFilter); TableTools.show(filtered); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java index 73695570e8e..28f94e644ab 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java @@ -1,3 +1,6 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// package io.deephaven.engine.table.impl.util; import io.deephaven.base.clock.Clock; @@ -13,6 +16,7 @@ public class TestClock implements Clock { /** * Set the time. + * * @param epochNanos the time to set in nanos since the epoch * @return this clock */ @@ -23,6 +27,7 @@ public TestClock setNanos(long epochNanos) { /** * Set the time. + * * @param epochMillis the time to set in millis since the epoch * @return this clock */ @@ -33,6 +38,7 @@ public TestClock setMillis(long epochMillis) { /** * Add millis to the current time. + * * @param millis the number of millis to add * @return this clock */ From dc5aa8ba14b8ee82ff1c848910e0ebb1b567eea8 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 11:23:09 -0400 Subject: [PATCH 04/20] actually do the inversion. --- .../engine/table/impl/select/TimeSeriesFilter.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index ca374c28799..fb89e2ff636 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -137,7 +137,11 @@ public synchronized WritableRowSet filter( mergedListener.insertMatched(fullSet); } - return selection.intersect(mergedListener.inWindowRowset); + if (invert) { + return selection.minus(mergedListener.inWindowRowset); + } else { + return selection.intersect(mergedListener.inWindowRowset); + } } @Override From 5b9f8080e3eaa7e9bf63667e62b841de69a411f7 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 11:23:40 -0400 Subject: [PATCH 05/20] todo --- .../io/deephaven/engine/table/impl/select/TimeSeriesFilter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index fb89e2ff636..da52cf5ff37 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -211,6 +211,7 @@ protected void process() { } private void insertMatched(final RowSet rowSet) { + // TODO: should we include nulls here? try (final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, null, rowSet, Boolean.TRUE)) { inWindowRowset.insert(matched); From f6f376b8954ab25ba4cc2b33a192c2f28531bd40 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 12:45:18 -0400 Subject: [PATCH 06/20] shifts. --- .../table/impl/select/TimeSeriesFilter.java | 19 +++++++++++++++---- .../impl/select/TestTimeSeriesFilter.java | 5 +---- .../engine/table/impl/util/TestClock.java | 3 ++- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index da52cf5ff37..9ed33faee26 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -10,6 +10,7 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.TableUpdate; @@ -122,6 +123,7 @@ public synchronized WritableRowSet filter( final Pair pair = WindowCheck.addTimeWindowInternal(clock, (QueryTable) table, columnName, periodNanos + 1, windowSourceName, true); tableWithWindow = (QueryTable) pair.first; + mergedListener.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName); refreshFunctionForUnitTests = pair.second; manage(tableWithWindow); @@ -159,7 +161,7 @@ public void setRecomputeListener(RecomputeListener listener) { @Override public boolean satisfied(long step) { - return updateGraph.satisfied(step); + return mergedListener.satisfied(step); } @Override @@ -184,6 +186,7 @@ public boolean permitParallelization() { private class TimeSeriesFilterMergedListener extends MergedListener { final WritableRowSet inWindowRowset = RowSetFactory.empty(); + private ModifiedColumnSet windowColumnSet; protected TimeSeriesFilterMergedListener(String listenerDescription) { super(windowListenerRecorder, windowDependency, listenerDescription, null); @@ -192,12 +195,18 @@ protected TimeSeriesFilterMergedListener(String listenerDescription) { @Override protected void process() { synchronized (TimeSeriesFilter.this) { - final TableUpdate update = windowListenerRecorder.get(0).getUpdate().acquire(); + final ListenerRecorder recorder = windowListenerRecorder.get(0); + Assert.assertion(recorder.recordedVariablesAreValid(), "recorder.recordedVariablesAreValid()"); + final TableUpdate update = recorder.getUpdate().acquire(); inWindowRowset.remove(update.removed()); - if (update.modifiedColumnSet().containsAll(tableWithWindow.newModifiedColumnSet(windowSourceName))) { + final boolean windowModified = update.modifiedColumnSet().containsAny(windowColumnSet); + if (windowModified) { // we need to check on the modified rows; they may be in the window, inWindowRowset.remove(update.getModifiedPreShift()); + } + update.shifted().apply(inWindowRowset); + if (windowModified) { insertMatched(update.modified()); } insertMatched(update.added()); @@ -211,7 +220,9 @@ protected void process() { } private void insertMatched(final RowSet rowSet) { - // TODO: should we include nulls here? + // The original filter did not include nulls for a regular filter, so we do not include them here either to + // maintain compatibility. That also means the inverted filter is going to include nulls (as the null is + // less than the current time using Deephaven long comparisons). try (final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, null, rowSet, Boolean.TRUE)) { inWindowRowset.insert(matched); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java index 08f549ccad4..9c3892f47ed 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java @@ -6,15 +6,12 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.util.TestClock; -import io.deephaven.engine.testutil.ColumnInfo; -import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.testutil.*; import io.deephaven.engine.testutil.generator.DateGenerator; import io.deephaven.engine.testutil.generator.IntGenerator; import io.deephaven.engine.util.TableTools; -import io.deephaven.engine.testutil.EvalNugget; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.testutil.TstUtils; import io.deephaven.time.DateTimeUtils; import java.lang.ref.WeakReference; diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java index 28f94e644ab..2279966a512 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java @@ -7,6 +7,7 @@ import io.deephaven.time.DateTimeUtils; import java.time.Instant; +import java.time.ZoneId; /** * A clock that has a fixed time for use in unit tests. @@ -74,6 +75,6 @@ public Instant instantMillis() { @Override public String toString() { - return "TestClock{" + DateTimeUtils.epochNanosToInstant(nanos) + "}"; + return "TestClock{" + DateTimeUtils.epochNanosToInstant(nanos).atZone(ZoneId.systemDefault()) + "}"; } } From 829edeb7f9153a3bf55659f69abd252493c3047e Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 15:40:34 -0400 Subject: [PATCH 07/20] beginOperation and better requestRecompute call. --- .../engine/table/impl/QueryTable.java | 44 +++++- .../table/impl/WouldMatchOperation.java | 7 + .../table/impl/select/TimeSeriesFilter.java | 147 ++++++++++-------- .../engine/table/impl/select/WhereFilter.java | 10 +- 4 files changed, 143 insertions(+), 65 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 1c227ea3ae7..27d2ca50e1e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -974,6 +974,7 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco private final QueryTable source; private boolean refilterMatchedRequested = false; private boolean refilterUnmatchedRequested = false; + private WritableRowSet refilterRequestedRowset = null; private MergedListener whereListener; @ReferentialIntegrity @@ -1006,6 +1007,16 @@ public void requestRecomputeMatched() { Require.neqNull(whereListener, "whereListener").notifyChanges(); } + @Override + public void requestRecompute(RowSet rowSet) { + if (refilterRequestedRowset == null) { + refilterRequestedRowset = rowSet.copy(); + } else { + refilterRequestedRowset.insert(rowSet); + } + Require.neqNull(whereListener, "whereListener").notifyChanges(); + } + /** * Note that refilterRequested is only accessible so that {@link WhereListener} can get to it and is not part of * the public API. @@ -1014,7 +1025,7 @@ public void requestRecomputeMatched() { */ @InternalUseOnly boolean refilterRequested() { - return refilterUnmatchedRequested || refilterMatchedRequested; + return refilterUnmatchedRequested || refilterMatchedRequested || refilterRequestedRowset != null; } @NotNull @@ -1068,6 +1079,7 @@ void doRefilter( (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), exception -> errorRefilterUpdate(listener, exception, upstream)); refilterMatchedRequested = refilterUnmatchedRequested = false; + refilterRequestedRowset = null; } else if (refilterUnmatchedRequested) { // things that are added or removed are already reflected in source.getRowSet final WritableRowSet unmatchedRows = source.getRowSet().minus(getRowSet()); @@ -1075,6 +1087,10 @@ void doRefilter( if (upstream != null) { unmatchedRows.insert(upstream.modified()); } + if (refilterRequestedRowset != null) { + unmatchedRows.insert(refilterRequestedRowset); + refilterRequestedRowset = null; + } final RowSet unmatched = unmatchedRows.copy(); final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched); filterExecution.scheduleCompletion((adds, mods) -> { @@ -1098,6 +1114,10 @@ void doRefilter( matchedRows.insert(upstream.added()); matchedRows.insert(upstream.modified()); } + if (refilterRequestedRowset != null) { + matchedRows.insert(refilterRequestedRowset); + refilterRequestedRowset = null; + } final RowSet matchedClone = matchedRows.copy(); final WhereListener.ListenerFilterExecution filterExecution = @@ -1106,6 +1126,28 @@ void doRefilter( (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), exception -> errorRefilterUpdate(listener, exception, upstream)); refilterMatchedRequested = false; + } else if (refilterRequestedRowset != null) { + final WritableRowSet rowsToFilter = refilterRequestedRowset.copy(); + if (upstream != null) { + rowsToFilter.insert(upstream.added()); + rowsToFilter.insert(upstream.modified()); + } + + final WhereListener.ListenerFilterExecution filterExecution = + listener.makeRefilterExecution(rowsToFilter); + + filterExecution.scheduleCompletion((adds, mods) -> { + final WritableRowSet newMapping = adds.writableCast(); + // add back what we previously matched, but for modifications and removals + try (final WritableRowSet previouslyMatched = getRowSet().copy()) { + previouslyMatched.remove(rowsToFilter); + newMapping.insert(previouslyMatched); + } + completeRefilterUpdate(listener, upstream, update, adds); + }, exception -> errorRefilterUpdate(listener, exception, upstream)); + + + refilterRequestedRowset = null; } else { throw new IllegalStateException("Refilter called when a refilter was not requested!"); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java index 3899e5ca5fb..99f00b9bef3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java @@ -444,6 +444,13 @@ public void requestRecomputeMatched() { Require.neqNull(mergedListener, "mergedListener").notifyChanges(); } + @Override + public void requestRecompute(RowSet rowSet) { + // TODO: No need to recompute the remaining rows + doRecompute = true; + Require.neqNull(mergedListener, "mergedListener").notifyChanges(); + } + @NotNull @Override public QueryTable getTable() { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 9ed33faee26..441ce71534b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -21,22 +21,25 @@ import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.util.WindowCheck; import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.TestUseOnly; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.time.Duration; -import java.time.Instant; import java.util.Collections; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; /** - * This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column). + * TimeSeriesFilter filters a timestamp colum within the table for recent rows. * *

- * Note, this filter rescans the source table. You should prefer to use {@link io.deephaven.engine.util.WindowCheck} - * instead. + * The filtered column must be an Instant or long containing nanoseconds since the epoch. The computation of recency is + * delegated to {@link io.deephaven.engine.util.WindowCheck}. + *

+ * + *

+ * When the filter is not inverted, null rows are not accepted and rows that match the window exactly are accepted. *

*/ public class TimeSeriesFilter @@ -48,18 +51,13 @@ public class TimeSeriesFilter private final Clock clock; private RecomputeListener listener; - // this table contains our window - private QueryTable tableWithWindow; - private String windowSourceName; private Runnable refreshFunctionForUnitTests; /** * We create the merged listener so that we can participate in dependency resolution, but the dependencies are not * actually known until the first call to filter(). */ - private final List windowListenerRecorder = new CopyOnWriteArrayList<>(); - private final List windowDependency = new CopyOnWriteArrayList<>(); - private final TimeSeriesFilterMergedListener mergedListener; + private TimeSeriesFilterMergedListener mergedListener; @SuppressWarnings("UnusedDeclaration") public TimeSeriesFilter(final String columnName, @@ -87,8 +85,6 @@ public TimeSeriesFilter(final String columnName, this.periodNanos = periodNanos; this.invert = invert; this.clock = clock; - this.mergedListener = new TimeSeriesFilterMergedListener( - "TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")"); } @Override @@ -115,34 +111,12 @@ public synchronized WritableRowSet filter( throw new PreviousFilteringNotSupported(); } - if (tableWithWindow == null) { - windowSourceName = "__Window_" + columnName; - while (table.getDefinition().getColumnNames().contains(windowSourceName)) { - windowSourceName = "_" + windowSourceName; - } - final Pair pair = WindowCheck.addTimeWindowInternal(clock, - (QueryTable) table, columnName, periodNanos + 1, windowSourceName, true); - tableWithWindow = (QueryTable) pair.first; - mergedListener.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName); - refreshFunctionForUnitTests = pair.second; - - manage(tableWithWindow); - windowDependency.add(tableWithWindow); - final ListenerRecorder recorder = - new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null); - tableWithWindow.addUpdateListener(recorder); - recorder.setMergedListener(mergedListener); - - windowListenerRecorder.add(recorder); - - // we are doing the first match, which is based on the entire set of values in the table - mergedListener.insertMatched(fullSet); - } + Assert.neqNull(mergedListener, "mergedListener"); if (invert) { - return selection.minus(mergedListener.inWindowRowset); + return selection.minus(mergedListener.inWindowRowSet); } else { - return selection.intersect(mergedListener.inWindowRowset); + return selection.intersect(mergedListener.inWindowRowSet); } } @@ -185,48 +159,65 @@ public boolean permitParallelization() { } private class TimeSeriesFilterMergedListener extends MergedListener { - final WritableRowSet inWindowRowset = RowSetFactory.empty(); - private ModifiedColumnSet windowColumnSet; - - protected TimeSeriesFilterMergedListener(String listenerDescription) { - super(windowListenerRecorder, windowDependency, listenerDescription, null); + // the list of rows that exist within our window + final WritableRowSet inWindowRowSet = RowSetFactory.empty(); + + // this table contains our window + private final QueryTable tableWithWindow; + // this is our listener recorder for tableWithWindow + private final ListenerRecorder windowRecorder; + + // the name of the window column + private final String windowSourceName; + + private final ModifiedColumnSet windowColumnSet; + + protected TimeSeriesFilterMergedListener(String listenerDescription, QueryTable tableWithWindow, + final ListenerRecorder windowRecorder, final String windowSourceName) { + super(Collections.singleton(windowRecorder), Collections.singleton(tableWithWindow), listenerDescription, + null); + this.tableWithWindow = tableWithWindow; + this.windowRecorder = windowRecorder; + this.windowSourceName = windowSourceName; + this.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName); } @Override protected void process() { synchronized (TimeSeriesFilter.this) { - final ListenerRecorder recorder = windowListenerRecorder.get(0); - Assert.assertion(recorder.recordedVariablesAreValid(), "recorder.recordedVariablesAreValid()"); - final TableUpdate update = recorder.getUpdate().acquire(); + Assert.assertion(windowRecorder.recordedVariablesAreValid(), + "windowRecorder.recordedVariablesAreValid()"); + final TableUpdate update = windowRecorder.getUpdate(); - inWindowRowset.remove(update.removed()); + inWindowRowSet.remove(update.removed()); final boolean windowModified = update.modifiedColumnSet().containsAny(windowColumnSet); if (windowModified) { // we need to check on the modified rows; they may be in the window, - inWindowRowset.remove(update.getModifiedPreShift()); + inWindowRowSet.remove(update.getModifiedPreShift()); } - update.shifted().apply(inWindowRowset); + update.shifted().apply(inWindowRowSet); if (windowModified) { - insertMatched(update.modified()); + final RowSet newlyMatched = insertMatched(update.modified()); + if (invert) { + if (newlyMatched.isNonempty()) { + listener.requestRecompute(newlyMatched); + } + } else if (newlyMatched.size() != update.modified().size()) { + listener.requestRecompute(update.modified().minus(newlyMatched)); + } } insertMatched(update.added()); - - if (invert) { - listener.requestRecomputeUnmatched(); - } else { - listener.requestRecomputeMatched(); - } } } - private void insertMatched(final RowSet rowSet) { + private RowSet insertMatched(final RowSet rowSet) { // The original filter did not include nulls for a regular filter, so we do not include them here either to - // maintain compatibility. That also means the inverted filter is going to include nulls (as the null is + // maintain compatibility. That also means the inverted filter is going to include nulls (as the null is // less than the current time using Deephaven long comparisons). - try (final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, - null, rowSet, Boolean.TRUE)) { - inWindowRowset.insert(matched); - } + final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, + null, rowSet, Boolean.TRUE); + inWindowRowSet.insert(matched); + return matched; } } @@ -234,4 +225,36 @@ private void insertMatched(final RowSet rowSet) { void runForUnitTests() { refreshFunctionForUnitTests.run(); } + + @Override + public SafeCloseable beginOperation(@NotNull Table sourceTable) { + String windowSourceName = "__Window_" + columnName; + while (sourceTable.getDefinition().getColumnNames().contains(windowSourceName)) { + windowSourceName = "_" + windowSourceName; + } + + final Pair pair = WindowCheck.addTimeWindowInternal(clock, + (QueryTable) sourceTable, columnName, periodNanos + 1, windowSourceName, true); + final QueryTable tableWithWindow = (QueryTable) pair.first; + refreshFunctionForUnitTests = pair.second; + + manage(tableWithWindow); + + final ListenerRecorder recorder = + new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null); + tableWithWindow.addUpdateListener(recorder); + + mergedListener = new TimeSeriesFilterMergedListener( + "TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")", + tableWithWindow, recorder, windowSourceName); + manage(mergedListener); + + recorder.setMergedListener(mergedListener); + + // we are doing the first match, which is based on the entire set of values in the table + mergedListener.insertMatched(sourceTable.getRowSet()); + + // the only thing we hold is our mergedListener, which in turn holds the recorder and the windowed table + return null; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java index 5c17955ccde..b9614b7a6fd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java @@ -58,17 +58,23 @@ interface RecomputeListener { void requestRecompute(); /** - * Notify the something about the filters has changed such that all unmatched rows of the source table should be + * Notify that something about the filters has changed such that all unmatched rows of the source table should be * re-evaluated. */ void requestRecomputeUnmatched(); /** - * Notify the something about the filters has changed such that all matched rows of the source table should be + * Notify that something about the filters has changed such that all matched rows of the source table should be * re-evaluated. */ void requestRecomputeMatched(); + /** + * Notify that something about the filters has changed such that the following rows of the source table should + * be re-evaluated. + */ + void requestRecompute(RowSet rowSet); + /** * Get the table underlying this listener. * From b332d33d498f8fac4856e8a3aec6e4d6a4fb1143 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 15:51:21 -0400 Subject: [PATCH 08/20] spotless. --- .../deephaven/engine/table/impl/select/TimeSeriesFilter.java | 2 +- .../io/deephaven/engine/table/impl/select/WhereFilter.java | 4 ++-- .../engine/table/impl/select/TestTimeSeriesFilter.java | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 441ce71534b..4297a0ef839 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -215,7 +215,7 @@ private RowSet insertMatched(final RowSet rowSet) { // maintain compatibility. That also means the inverted filter is going to include nulls (as the null is // less than the current time using Deephaven long comparisons). final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, - null, rowSet, Boolean.TRUE); + null, rowSet, Boolean.TRUE); inWindowRowSet.insert(matched); return matched; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java index b9614b7a6fd..04c589fa0a1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java @@ -58,8 +58,8 @@ interface RecomputeListener { void requestRecompute(); /** - * Notify that something about the filters has changed such that all unmatched rows of the source table should be - * re-evaluated. + * Notify that something about the filters has changed such that all unmatched rows of the source table should + * be re-evaluated. */ void requestRecomputeUnmatched(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java index 9c3892f47ed..6d791ec1631 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java @@ -133,4 +133,5 @@ public void testIncremental() throws ParseException { // TODO: test inverted filters // TODO: test actual modifications and additions to the table // TODO: test when nothing actually changes from the window check perspective + // TODO: test that we are not causing more refiltering than necessary (with some counting filters before and after) } From 3f3e23df22a15b6c2f4b67eef3eaac6d25f3640d Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 20:29:21 -0400 Subject: [PATCH 09/20] inversion test. --- .../table/impl/select/TimeSeriesFilter.java | 8 +-- .../impl/select/TestTimeSeriesFilter.java | 67 +++++++++++++++++-- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 4297a0ef839..20637659768 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -198,12 +198,10 @@ protected void process() { update.shifted().apply(inWindowRowSet); if (windowModified) { final RowSet newlyMatched = insertMatched(update.modified()); - if (invert) { - if (newlyMatched.isNonempty()) { - listener.requestRecompute(newlyMatched); + try (final WritableRowSet movedOutOfWindow = update.modified().minus(newlyMatched)) { + if (movedOutOfWindow.isNonempty()) { + listener.requestRecompute(movedOutOfWindow); } - } else if (newlyMatched.size() != update.modified().size()) { - listener.requestRecompute(update.modified().minus(newlyMatched)); } } insertMatched(update.added()); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java index 6d791ec1631..16950f81f8a 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java @@ -74,6 +74,54 @@ public void testSimple() { assertEquals(3, filtered.size()); } + public void testInverted() { + Instant[] times = new Instant[10]; + + final long startTime = System.currentTimeMillis() - (10 * times.length); + for (int ii = 0; ii < times.length; ++ii) { + times[ii] = DateTimeUtils.epochNanosToInstant((startTime + (ii * 1000)) * 1000000L); + } + + Table source = TableTools.newTable(TableTools.col("Timestamp", times)); + TableTools.show(source); + + final TestClock testClock = new TestClock().setMillis(startTime); + + final TimeSeriesFilter timeSeriesFilter = + new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), true, testClock); + Table filtered = source.where(timeSeriesFilter); + + TableTools.show(filtered); + assertEquals(0, filtered.size()); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.runWithinUnitTestCycle(() -> { + testClock.addMillis(5000); + timeSeriesFilter.runForUnitTests(); + }); + + TableTools.show(filtered); + assertEquals(0, filtered.size()); + + updateGraph.runWithinUnitTestCycle(() -> { + testClock.addMillis(5000); + timeSeriesFilter.runForUnitTests(); + }); + + System.out.println(testClock); + + TableTools.show(filtered); + assertEquals(5, filtered.size()); + + updateGraph.runWithinUnitTestCycle(() -> { + testClock.addMillis(2000); + timeSeriesFilter.runForUnitTests(); + }); + + TableTools.show(filtered); + assertEquals(7, filtered.size()); + } + public void testIncremental() throws ParseException { Random random = new Random(0); @@ -89,18 +137,24 @@ public void testIncremental() throws ParseException { final TestClock testClock = new TestClock().setMillis(startDate.getTime()); - final TimeSeriesFilter unitTestTimeSeriesFilter = + final TimeSeriesFilter inclusionFilter = new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock); + final TimeSeriesFilter exclusionFilter = + new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock); final ArrayList> filtersToRefresh = new ArrayList<>(); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final Table withInstant = table.update("Date=DateTimeUtils.epochNanosToInstant(Date.getTime() * 1000000L)"); EvalNugget[] en = new EvalNugget[] { EvalNugget.from(() -> { - TimeSeriesFilter unitTestTimeSeriesFilter1 = unitTestTimeSeriesFilter.copy(); - filtersToRefresh.add(new WeakReference<>(unitTestTimeSeriesFilter1)); - return updateGraph.exclusiveLock().computeLocked( - () -> table.update("Date=DateTimeUtils.epochNanosToInstant(Date.getTime() * 1000000L)") - .where(unitTestTimeSeriesFilter1)); + final TimeSeriesFilter inclusionCopy = inclusionFilter.copy(); + filtersToRefresh.add(new WeakReference<>(inclusionCopy)); + return updateGraph.exclusiveLock().computeLocked(() -> withInstant.where(inclusionCopy)); + }), + EvalNugget.from(() -> { + final TimeSeriesFilter exclusionCopy = exclusionFilter.copy(); + filtersToRefresh.add(new WeakReference<>(exclusionCopy)); + return updateGraph.exclusiveLock().computeLocked(() -> withInstant.where(exclusionCopy)); }), }; @@ -130,7 +184,6 @@ public void testIncremental() throws ParseException { } // TODO: test in a sequence of filters, with a dynamic where filter in front of us - // TODO: test inverted filters // TODO: test actual modifications and additions to the table // TODO: test when nothing actually changes from the window check perspective // TODO: test that we are not causing more refiltering than necessary (with some counting filters before and after) From a362df17fb825008b119c13b945d0c7f206571a0 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 18 Sep 2024 21:53:05 -0400 Subject: [PATCH 10/20] Add test for filter recomputation counts. --- .../impl/select/TestTimeSeriesFilter.java | 232 ++++++++++++++++-- 1 file changed, 206 insertions(+), 26 deletions(-) diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java index 16950f81f8a..14559cc8bfc 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java @@ -3,8 +3,12 @@ // package io.deephaven.engine.table.impl.select; +import io.deephaven.api.filter.Filter; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.util.TestClock; import io.deephaven.engine.testutil.*; import io.deephaven.engine.testutil.generator.DateGenerator; @@ -13,6 +17,8 @@ import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.time.DateTimeUtils; +import org.apache.commons.lang3.mutable.MutableInt; +import org.jetbrains.annotations.NotNull; import java.lang.ref.WeakReference; import java.text.ParseException; @@ -20,10 +26,13 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; -import static io.deephaven.engine.testutil.TstUtils.getTable; -import static io.deephaven.engine.testutil.TstUtils.initColumnInfos; +import static io.deephaven.engine.testutil.TstUtils.*; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.longCol; public class TestTimeSeriesFilter extends RefreshingTableTestCase { public void testSimple() { @@ -144,8 +153,66 @@ public void testIncremental() throws ParseException { final ArrayList> filtersToRefresh = new ArrayList<>(); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter); + + + int updatesPerTick = 3; + for (int ii = 0; ii < 24 * (updatesPerTick + 1); ++ii) { + if (ii % (updatesPerTick + 1) > 0) { + simulateShiftAwareStep(size, random, table, columnInfo, en); + } else { + updateGraph.runWithinUnitTestCycle(() -> refreshFilters(testClock, filtersToRefresh, 3600 * 1000)); + TstUtils.validate("time update " + ii, en); + } + } + } + + public void testIncremental2() throws ParseException { + Random random = new Random(0); + + final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); + + ColumnInfo[] columnInfo; + int size = 100; + final Date startDate = format.parse("2015-03-23"); + Date endDate = format.parse("2015-03-24"); + final QueryTable table = getTable(size, random, columnInfo = initColumnInfos(new String[] {"Date", "C1"}, + new DateGenerator(startDate, endDate), + new IntGenerator(1, 100))); + + final TestClock testClock = new TestClock().setMillis(startDate.getTime()); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + final TimeSeriesFilter inclusionFilter = + new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock); + final TimeSeriesFilter exclusionFilter = + new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock); + final ArrayList> filtersToRefresh = new ArrayList<>(); + + EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter); + + final MutableInt advanced = new MutableInt(0); + while (advanced.intValue() < 24 * 3600 * 1000) { + updateGraph.runWithinUnitTestCycle(() -> { + if (random.nextBoolean()) { + final int toAdvance = random.nextInt(1800 * 1000); + refreshFilters(testClock, filtersToRefresh, toAdvance); + advanced.add(toAdvance); + } + if (random.nextBoolean()) { + GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, size, + random, table, columnInfo); + } + }); + TstUtils.validate("incremental2", en); + } + } + + private static EvalNugget @NotNull [] makeNuggets(QueryTable table, TimeSeriesFilter inclusionFilter, + ArrayList> filtersToRefresh, ControlledUpdateGraph updateGraph, + TimeSeriesFilter exclusionFilter) { final Table withInstant = table.update("Date=DateTimeUtils.epochNanosToInstant(Date.getTime() * 1000000L)"); - EvalNugget[] en = new EvalNugget[] { + return new EvalNugget[] { EvalNugget.from(() -> { final TimeSeriesFilter inclusionCopy = inclusionFilter.copy(); filtersToRefresh.add(new WeakReference<>(inclusionCopy)); @@ -157,34 +224,147 @@ public void testIncremental() throws ParseException { return updateGraph.exclusiveLock().computeLocked(() -> withInstant.where(exclusionCopy)); }), }; + } + private static void refreshFilters(final TestClock testClock, + final List> filtersToRefresh, final int millisToAdvance) { + testClock.addMillis(millisToAdvance); - int updatesPerTick = 3; - for (int ii = 0; ii < 24 * (updatesPerTick + 1); ++ii) { - if (ii % (updatesPerTick + 1) > 0) { - simulateShiftAwareStep(size, random, table, columnInfo, en); + final List> collectedRefs = new ArrayList<>(); + for (WeakReference ref : filtersToRefresh) { + final TimeSeriesFilter refreshFilter = ref.get(); + if (refreshFilter == null) { + collectedRefs.add(ref); } else { - updateGraph.runWithinUnitTestCycle(() -> { - testClock.addMillis(3600 * 1000); - - final ArrayList> collectedRefs = new ArrayList<>(); - for (WeakReference ref : filtersToRefresh) { - final TimeSeriesFilter refreshFilter = ref.get(); - if (refreshFilter == null) { - collectedRefs.add(ref); - } else { - refreshFilter.runForUnitTests(); - } - } - filtersToRefresh.removeAll(collectedRefs); - }); - TstUtils.validate("time update " + ii, en); + refreshFilter.runForUnitTests(); } } + filtersToRefresh.removeAll(collectedRefs); + } + + private static class CountingFilter extends WhereFilterImpl { + final AtomicLong count; + + CountingFilter() { + count = new AtomicLong(0); + } + + CountingFilter(final AtomicLong count) { + this.count = count; + } + + @Override + public List getColumns() { + return List.of("Sentinel"); + } + + @Override + public List getColumnArrays() { + return List.of(); + } + + @Override + public void init(@NotNull TableDefinition tableDefinition) { + + } + + @Override + public @NotNull WritableRowSet filter(@NotNull RowSet selection, @NotNull RowSet fullSet, @NotNull Table table, + boolean usePrev) { + count.addAndGet(selection.size()); + return selection.copy(); + } + + @Override + public boolean isSimpleFilter() { + return true; + } + + @Override + public void setRecomputeListener(RecomputeListener result) { + + } + + @Override + public WhereFilter copy() { + return new CountingFilter(count); + } } - // TODO: test in a sequence of filters, with a dynamic where filter in front of us - // TODO: test actual modifications and additions to the table - // TODO: test when nothing actually changes from the window check perspective - // TODO: test that we are not causing more refiltering than necessary (with some counting filters before and after) + public void testFilterSequence() { + final long start = DateTimeUtils.epochNanos(DateTimeUtils.parseInstant("2024-09-18T21:29:00 NY")); + final QueryTable source = testRefreshingTable(i().toTracking(), longCol("Timestamp"), intCol("Sentinel")); + final TestClock testClock = new TestClock().setNanos(start); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + final CountingFilter cfi1 = new CountingFilter(); + final CountingFilter cfi2 = new CountingFilter(); + final CountingFilter cfe1 = new CountingFilter(); + final CountingFilter cfe2 = new CountingFilter(); + + final TimeSeriesFilter inclusionFilter = + new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock); + final TimeSeriesFilter exclusionFilter = + new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock); + + final Table inclusion = source.where(Filter.and(cfi1, inclusionFilter, cfi2)); + final Table exclusion = source.where(Filter.and(cfe1, exclusionFilter, cfe2)); + + assertEquals(0, cfi1.count.intValue()); + assertEquals(0, cfi2.count.intValue()); + assertEquals(0, cfe1.count.intValue()); + assertEquals(0, cfe2.count.intValue()); + + updateGraph.runWithinUnitTestCycle(() -> { + TstUtils.addToTable(source, i(10), longCol("Timestamp", start), intCol("Sentinel", 10)); + source.notifyListeners(i(10), i(), i()); + inclusionFilter.runForUnitTests(); + exclusionFilter.runForUnitTests(); + }); + + TableTools.show(inclusion); + TableTools.show(exclusion); + + assertEquals(1, cfi1.count.intValue()); + assertEquals(1, cfi2.count.intValue()); + assertEquals(1, cfe1.count.intValue()); + assertEquals(0, cfe2.count.intValue()); + + + updateGraph.runWithinUnitTestCycle(() -> { + TstUtils.addToTable(source, i(10, 20, 30), + longCol("Timestamp", start, start + 300_000_000_000L, start + 4200_000_000_000L), + intCol("Sentinel", 10, 20, 30)); + source.notifyListeners(i(20, 30), i(), i(10)); + inclusionFilter.runForUnitTests(); + exclusionFilter.runForUnitTests(); + }); + + assertEquals(4, cfi1.count.intValue()); + assertEquals(4, cfi2.count.intValue()); + assertEquals(4, cfe1.count.intValue()); + assertEquals(0, cfe2.count.intValue()); + + updateGraph.runWithinUnitTestCycle(() -> { + testClock.addMillis(3_700_000L); + inclusionFilter.runForUnitTests(); + exclusionFilter.runForUnitTests(); + }); + + assertEquals(5, cfi1.count.intValue()); + assertEquals(4, cfi2.count.intValue()); + assertEquals(5, cfe1.count.intValue()); + assertEquals(1, cfe2.count.intValue()); + + updateGraph.runWithinUnitTestCycle(() -> { + testClock.addMillis(300_000L); + inclusionFilter.runForUnitTests(); + exclusionFilter.runForUnitTests(); + }); + + assertEquals(6, cfi1.count.intValue()); + assertEquals(4, cfi2.count.intValue()); + assertEquals(6, cfe1.count.intValue()); + assertEquals(2, cfe2.count.intValue()); + } } From 71f478679656d63ca47ad3b8d30916ff3aee448c Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 19 Sep 2024 06:25:05 -0400 Subject: [PATCH 11/20] Javadoc. --- .../table/impl/select/TimeSeriesFilter.java | 87 +++++++++++++++---- 1 file changed, 72 insertions(+), 15 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 20637659768..2d76a21570a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -10,10 +10,7 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableDefinition; -import io.deephaven.engine.table.TableUpdate; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.ListenerRecorder; import io.deephaven.engine.table.impl.MergedListener; import io.deephaven.engine.table.impl.QueryTable; @@ -51,31 +48,91 @@ public class TimeSeriesFilter private final Clock clock; private RecomputeListener listener; + + /** + * For unit tests, we must be able to cause the window check to update. + */ private Runnable refreshFunctionForUnitTests; /** - * We create the merged listener so that we can participate in dependency resolution, but the dependencies are not - * actually known until the first call to filter(). + * The merged listener is responsible for listening to the WindowCheck result, updating our rowset that contains the + * rows inside of our window, and then notifying the WhereListener that we are requesting recomputation. */ private TimeSeriesFilterMergedListener mergedListener; + /** + * Create a TimeSeriesFilter on the given column for the given period + * + * @param columnName the name of the timestamp column + * @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String). + */ @SuppressWarnings("UnusedDeclaration") public TimeSeriesFilter(final String columnName, final String period) { this(columnName, DateTimeUtils.parseDurationNanos(period)); } + /** + * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. + * + * @param columnName the name of the timestamp column + * @param nanos the duration of the window in nanoseconds + */ public TimeSeriesFilter(final String columnName, final long nanos) { this(columnName, nanos, false); } + /** + * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. + * + *

+ * The filter may be inverted, meaning that instead of including recent rows within the window it only + * includes rows outside the window. + *

+ * + * @param columnName the name of the timestamp column + * @param nanos the duration of the window in nanoseconds + * @param invert true if only rows outside the window should be included in the result + */ public TimeSeriesFilter(final String columnName, final long nanos, final boolean invert) { this(columnName, nanos, invert, null); } + /** + * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. + * + *

+ * The filter may be inverted, meaning that instead of including recent rows within the window it only + * includes rows outside the window. + *

+ * + * @param columnName the name of the timestamp column + * @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String). + * @param invert true if only rows outside the window should be included in the result + */ + public TimeSeriesFilter(final String columnName, + final String period, + final boolean invert) { + this(columnName, DateTimeUtils.parseDurationNanos(period), invert, null); + } + + /** + * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. + * + *

+ * The filter may be inverted, meaning that instead of including recent rows within the window it only + * includes rows outside the window. + *

+ * + * @param columnName the name of the timestamp column + * @param periodNanos the duration of the window in nanoseconds + * @param invert true if only rows outside the window should be included in the result + * @param clock the Clock to use as a time source for this filter, when null the clock supplied by + * {@link DateTimeUtils#currentClock()} is used. + */ public TimeSeriesFilter(final String columnName, final long periodNanos, final boolean invert, @@ -162,24 +219,22 @@ private class TimeSeriesFilterMergedListener extends MergedListener { // the list of rows that exist within our window final WritableRowSet inWindowRowSet = RowSetFactory.empty(); - // this table contains our window - private final QueryTable tableWithWindow; // this is our listener recorder for tableWithWindow private final ListenerRecorder windowRecorder; - // the name of the window column - private final String windowSourceName; - + // the columnset that represents our window source private final ModifiedColumnSet windowColumnSet; + // the column source containing the window; which we match on + private final ColumnSource windowColumnSource; + protected TimeSeriesFilterMergedListener(String listenerDescription, QueryTable tableWithWindow, final ListenerRecorder windowRecorder, final String windowSourceName) { super(Collections.singleton(windowRecorder), Collections.singleton(tableWithWindow), listenerDescription, null); - this.tableWithWindow = tableWithWindow; this.windowRecorder = windowRecorder; - this.windowSourceName = windowSourceName; this.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName); + this.windowColumnSource = tableWithWindow.getColumnSource(windowSourceName); } @Override @@ -212,13 +267,15 @@ private RowSet insertMatched(final RowSet rowSet) { // The original filter did not include nulls for a regular filter, so we do not include them here either to // maintain compatibility. That also means the inverted filter is going to include nulls (as the null is // less than the current time using Deephaven long comparisons). - final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false, - null, rowSet, Boolean.TRUE); + final RowSet matched = windowColumnSource.match(false, false, false, null, rowSet, Boolean.TRUE); inWindowRowSet.insert(matched); return matched; } } + /** + * For test uses, causes the WindowCheck to update rows based on the current value of clock. + */ @TestUseOnly void runForUnitTests() { refreshFunctionForUnitTests.run(); From f577dceeccd31f05cf4d35530421cd5febd4cb5d Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 19 Sep 2024 07:09:35 -0400 Subject: [PATCH 12/20] javadoc --- .../deephaven/engine/table/impl/select/TimeSeriesFilter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 2d76a21570a..b9161b54e31 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -64,7 +64,7 @@ public class TimeSeriesFilter * Create a TimeSeriesFilter on the given column for the given period * * @param columnName the name of the timestamp column - * @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String). + * @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String)}. */ @SuppressWarnings("UnusedDeclaration") public TimeSeriesFilter(final String columnName, @@ -110,7 +110,7 @@ public TimeSeriesFilter(final String columnName, *

* * @param columnName the name of the timestamp column - * @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String). + * @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String)}. * @param invert true if only rows outside the window should be included in the result */ public TimeSeriesFilter(final String columnName, From 247f1d83b3cd7c0637799694459043fd3be7471e Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 19 Sep 2024 15:50:25 -0400 Subject: [PATCH 13/20] changes and todos for the rest. --- .../engine/table/impl/QueryTable.java | 26 +++---- .../table/impl/WouldMatchOperation.java | 6 +- .../table/impl/select/TimeSeriesFilter.java | 74 ++++++++++--------- .../engine/table/impl/select/WhereFilter.java | 2 +- .../io/deephaven/engine/util/WindowCheck.java | 25 ++++--- .../engine/table/impl/util/TestClock.java | 2 + 6 files changed, 72 insertions(+), 63 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 27d2ca50e1e..2acaccec323 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1076,7 +1076,7 @@ void doRefilter( final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(source.getRowSet().copy()); filterExecution.scheduleCompletion( - (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), + (matchedRows, unusedMods) -> completeRefilterUpdate(listener, upstream, update, matchedRows), exception -> errorRefilterUpdate(listener, exception, upstream)); refilterMatchedRequested = refilterUnmatchedRequested = false; refilterRequestedRowset = null; @@ -1091,9 +1091,8 @@ void doRefilter( unmatchedRows.insert(refilterRequestedRowset); refilterRequestedRowset = null; } - final RowSet unmatched = unmatchedRows.copy(); - final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched); - filterExecution.scheduleCompletion((adds, mods) -> { + final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatchedRows); + filterExecution.scheduleCompletion((adds, unusedMods) -> { final WritableRowSet newMapping = adds.writableCast(); // add back what we previously matched, but for modifications and removals try (final WritableRowSet previouslyMatched = getRowSet().copy()) { @@ -1118,16 +1117,15 @@ void doRefilter( matchedRows.insert(refilterRequestedRowset); refilterRequestedRowset = null; } - final RowSet matchedClone = matchedRows.copy(); final WhereListener.ListenerFilterExecution filterExecution = - listener.makeRefilterExecution(matchedClone); + listener.makeRefilterExecution(matchedRows); filterExecution.scheduleCompletion( - (adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds), + (adds, unusedMods) -> completeRefilterUpdate(listener, upstream, update, adds), exception -> errorRefilterUpdate(listener, exception, upstream)); refilterMatchedRequested = false; } else if (refilterRequestedRowset != null) { - final WritableRowSet rowsToFilter = refilterRequestedRowset.copy(); + final WritableRowSet rowsToFilter = refilterRequestedRowset; if (upstream != null) { rowsToFilter.insert(upstream.added()); rowsToFilter.insert(upstream.modified()); @@ -1136,9 +1134,9 @@ void doRefilter( final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(rowsToFilter); - filterExecution.scheduleCompletion((adds, mods) -> { + filterExecution.scheduleCompletion((adds, unusedMods) -> { final WritableRowSet newMapping = adds.writableCast(); - // add back what we previously matched, but for modifications and removals + // add back what we previously matched, except for modifications and removals try (final WritableRowSet previouslyMatched = getRowSet().copy()) { previouslyMatched.remove(rowsToFilter); newMapping.insert(previouslyMatched); @@ -1162,20 +1160,18 @@ private void completeRefilterUpdate( update.added = newMapping.minus(getRowSet()); final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping); - // Update our index in post-shift keyspace. - getRowSet().writableCast().remove(postShiftRemovals); - getRowSet().writableCast().insert(update.added); + getRowSet().writableCast().resetTo(newMapping); // Note that removed must be propagated to listeners in pre-shift keyspace. if (upstream != null) { upstream.shifted().unapply(postShiftRemovals); } - update.removed.writableCast().insert(postShiftRemovals); + update.removed = postShiftRemovals; if (upstream == null || upstream.modified().isEmpty()) { update.modified = RowSetFactory.empty(); } else { - update.modified = upstream.modified().intersect(newMapping); + update.modified = upstream.modified().intersect(getRowSet()); update.modified.writableCast().remove(update.added); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java index 99f00b9bef3..eae78588441 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/WouldMatchOperation.java @@ -432,21 +432,21 @@ public void requestRecompute() { @Override public void requestRecomputeUnmatched() { - // TODO: No need to recompute matched rows + // TODO: No need to recompute matched rows (https://github.com/deephaven/deephaven-core/issues/6083) doRecompute = true; Require.neqNull(mergedListener, "mergedListener").notifyChanges(); } @Override public void requestRecomputeMatched() { - // TODO: No need to recompute unmatched rows + // TODO: No need to recompute unmatched rows (https://github.com/deephaven/deephaven-core/issues/6083) doRecompute = true; Require.neqNull(mergedListener, "mergedListener").notifyChanges(); } @Override public void requestRecompute(RowSet rowSet) { - // TODO: No need to recompute the remaining rows + // TODO: No need to recompute the remaining rows (https://github.com/deephaven/deephaven-core/issues/6083) doRecompute = true; Require.neqNull(mergedListener, "mergedListener").notifyChanges(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index b9161b54e31..fb88d2d98cd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -6,7 +6,6 @@ import io.deephaven.base.Pair; import io.deephaven.base.clock.Clock; import io.deephaven.base.verify.Assert; -import io.deephaven.base.verify.Require; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.rowset.RowSet; @@ -56,7 +55,7 @@ public class TimeSeriesFilter /** * The merged listener is responsible for listening to the WindowCheck result, updating our rowset that contains the - * rows inside of our window, and then notifying the WhereListener that we are requesting recomputation. + * rows in our window, and then notifying the WhereListener that we are requesting recomputation. */ private TimeSeriesFilterMergedListener mergedListener; @@ -76,29 +75,31 @@ public TimeSeriesFilter(final String columnName, * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. * * @param columnName the name of the timestamp column - * @param nanos the duration of the window in nanoseconds + * @param periodNanos the duration of the window in nanoseconds */ public TimeSeriesFilter(final String columnName, - final long nanos) { - this(columnName, nanos, false); + final long periodNanos) { + this(columnName, periodNanos, false); } + // TODO: USE A BUILDER FOR THE CONSTRUCTOR + /** * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. * *

* The filter may be inverted, meaning that instead of including recent rows within the window it only - * includes rows outside the window. + * includes rows outside the window or that are null. *

* * @param columnName the name of the timestamp column - * @param nanos the duration of the window in nanoseconds + * @param periodNanos the duration of the window in nanoseconds * @param invert true if only rows outside the window should be included in the result */ public TimeSeriesFilter(final String columnName, - final long nanos, + final long periodNanos, final boolean invert) { - this(columnName, nanos, invert, null); + this(columnName, periodNanos, invert, null); } /** @@ -137,7 +138,6 @@ public TimeSeriesFilter(final String columnName, final long periodNanos, final boolean invert, @Nullable final Clock clock) { - Require.gtZero(periodNanos, "periodNanos"); this.columnName = columnName; this.periodNanos = periodNanos; this.invert = invert; @@ -159,7 +159,7 @@ public void init(@NotNull final TableDefinition tableDefinition) {} @NotNull @Override - public synchronized WritableRowSet filter( + public WritableRowSet filter( @NotNull final RowSet selection, @NotNull final RowSet fullSet, @NotNull final Table table, @@ -212,9 +212,21 @@ public boolean isRefreshing() { @Override public boolean permitParallelization() { + // there is no reason to parallelize this filter, because the actual filtering is only a simple rowset operation + // and parallelization would cost more than actually applying the rowset operation once return false; } + @Override + public String toString() { + return "TimeSeriesFilter{" + + "columnName='" + columnName + '\'' + + ", periodNanos=" + periodNanos + + ", invert=" + invert + + '}'; + } + + // TODO: we should be listener not a merged listener private class TimeSeriesFilterMergedListener extends MergedListener { // the list of rows that exist within our window final WritableRowSet inWindowRowSet = RowSetFactory.empty(); @@ -239,28 +251,26 @@ protected TimeSeriesFilterMergedListener(String listenerDescription, QueryTable @Override protected void process() { - synchronized (TimeSeriesFilter.this) { - Assert.assertion(windowRecorder.recordedVariablesAreValid(), - "windowRecorder.recordedVariablesAreValid()"); - final TableUpdate update = windowRecorder.getUpdate(); - - inWindowRowSet.remove(update.removed()); - final boolean windowModified = update.modifiedColumnSet().containsAny(windowColumnSet); - if (windowModified) { - // we need to check on the modified rows; they may be in the window, - inWindowRowSet.remove(update.getModifiedPreShift()); - } - update.shifted().apply(inWindowRowSet); - if (windowModified) { - final RowSet newlyMatched = insertMatched(update.modified()); - try (final WritableRowSet movedOutOfWindow = update.modified().minus(newlyMatched)) { - if (movedOutOfWindow.isNonempty()) { - listener.requestRecompute(movedOutOfWindow); - } + Assert.assertion(windowRecorder.recordedVariablesAreValid(), + "windowRecorder.recordedVariablesAreValid()"); + final TableUpdate update = windowRecorder.getUpdate(); + + inWindowRowSet.remove(update.removed()); + final boolean windowModified = update.modifiedColumnSet().containsAny(windowColumnSet); + if (windowModified) { + // we need to check on the modified rows; they may be in the window, + inWindowRowSet.remove(update.getModifiedPreShift()); + } + update.shifted().apply(inWindowRowSet); + if (windowModified) { + final RowSet newlyMatched = insertMatched(update.modified()); + try (final WritableRowSet movedOutOfWindow = update.modified().minus(newlyMatched)) { + if (movedOutOfWindow.isNonempty()) { + listener.requestRecompute(movedOutOfWindow); } } - insertMatched(update.added()); } + insertMatched(update.added()); } private RowSet insertMatched(final RowSet rowSet) { @@ -284,7 +294,7 @@ void runForUnitTests() { @Override public SafeCloseable beginOperation(@NotNull Table sourceTable) { String windowSourceName = "__Window_" + columnName; - while (sourceTable.getDefinition().getColumnNames().contains(windowSourceName)) { + while (sourceTable.hasColumns(windowSourceName)) { windowSourceName = "_" + windowSourceName; } @@ -293,8 +303,6 @@ public SafeCloseable beginOperation(@NotNull Table sourceTable) { final QueryTable tableWithWindow = (QueryTable) pair.first; refreshFunctionForUnitTests = pair.second; - manage(tableWithWindow); - final ListenerRecorder recorder = new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null); tableWithWindow.addUpdateListener(recorder); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java index 04c589fa0a1..faac0d76b1a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java @@ -71,7 +71,7 @@ interface RecomputeListener { /** * Notify that something about the filters has changed such that the following rows of the source table should - * be re-evaluated. + * be re-evaluated. The rowSet ownership is not taken by requestRecompute. */ void requestRecompute(RowSet rowSet); diff --git a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java index 1af89c56855..7360bd43d5b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java @@ -117,7 +117,6 @@ public static Pair addTimeWindowInternal(Clock clock, } timeWindowListenerImpl.addRowSequence(table.getRowSet(), false); result.addParentReference(timeWindowListenerImpl); - result.manage(table); if (addToMonitor) { result.getUpdateGraph().addSource(timeWindowListenerImpl); } @@ -210,9 +209,10 @@ public String toString() { * @param source the source table * @param result our initialized result table */ + private TimeWindowListenerImpl(final String inWindowColumnName, final InWindowColumnSource inWindowColumnSource, final ListenerRecorder recorder, final QueryTable source, final QueryTable result) { - super(Collections.singleton(recorder), Collections.singleton(source), "WindowCheck", result); + super(Collections.singleton(recorder), List.of(), "WindowCheck", result); this.source = source; this.recorder = recorder; this.inWindowColumnSource = inWindowColumnSource; @@ -1062,9 +1062,10 @@ private long timeStampForPrev() { @Override public WritableRowSet match(boolean invertMatch, boolean usePrev, boolean caseInsensitive, @Nullable DataIndex dataIndex, @NotNull RowSet mapper, Object... keys) { - final boolean includeNull = Arrays.asList(keys).contains(null) ^ invertMatch; - final boolean includeTrue = Arrays.asList(keys).contains(true) ^ invertMatch; - final boolean includeFalse = Arrays.asList(keys).contains(false) ^ invertMatch; + final List keysList = Arrays.asList(keys); + final boolean includeNull = keysList.contains(null) ^ invertMatch; + final boolean includeTrue = keysList.contains(true) ^ invertMatch; + final boolean includeFalse = keysList.contains(false) ^ invertMatch; final int getSize = (int) Math.min(4096, mapper.size()); @@ -1081,18 +1082,20 @@ public WritableRowSet match(boolean invertMatch, boolean usePrev, boolean caseIn } else { timeStamps = timeStampSource.getChunk(getContext, chunkRs).asLongChunk(); } - for (int ii = 0; ii < rowKeys.size(); ++ii) { + final int chunkSize = rowKeys.size(); + for (int ii = 0; ii < chunkSize; ++ii) { final long rowKey = rowKeys.get(ii); - if (timeStamps.get(ii) == QueryConstants.NULL_LONG) { + final Boolean inWindow = computeInWindow(timeStamps.get(ii), currentTime); + if (inWindow == null) { if (includeNull) { builder.appendKey(rowKey); } - } else { - final boolean inWindow = computeInWindowUnsafe(timeStamps.get(ii), currentTime); - if (includeTrue && inWindow) { + } else if (inWindow) { + if (includeTrue) { builder.appendKey(rowKey); } - if (includeFalse && !inWindow) { + } else { + if (includeFalse) { builder.appendKey(rowKey); } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java index 2279966a512..4327a61bcd7 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java @@ -9,6 +9,8 @@ import java.time.Instant; import java.time.ZoneId; +// TODO: MOVE THIS TO THE EXISTING TESTCLOCK + /** * A clock that has a fixed time for use in unit tests. */ From 22a2975e6b689cf880e7eb3e961944b1d1a633ec Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 19 Sep 2024 16:29:37 -0400 Subject: [PATCH 14/20] simpler listener --- .../engine/table/impl/QueryTable.java | 13 ++--- .../table/impl/select/TimeSeriesFilter.java | 52 +++++++------------ .../io/deephaven/engine/util/WindowCheck.java | 1 - 3 files changed, 25 insertions(+), 41 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 2acaccec323..a9d15f9fb38 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1158,15 +1158,16 @@ private void completeRefilterUpdate( final RowSet newMapping) { // Compute added/removed in post-shift keyspace. update.added = newMapping.minus(getRowSet()); - final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping); - getRowSet().writableCast().resetTo(newMapping); + try (final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping)) { + getRowSet().writableCast().resetTo(newMapping); - // Note that removed must be propagated to listeners in pre-shift keyspace. - if (upstream != null) { - upstream.shifted().unapply(postShiftRemovals); + // Note that removed must be propagated to listeners in pre-shift keyspace. + if (upstream != null) { + upstream.shifted().unapply(postShiftRemovals); + } + update.removed.writableCast().insert(postShiftRemovals); } - update.removed = postShiftRemovals; if (upstream == null || upstream.modified().isEmpty()) { update.modified = RowSetFactory.empty(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index fb88d2d98cd..d41b3135eac 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -10,8 +10,8 @@ import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.*; +import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter; import io.deephaven.engine.table.impl.ListenerRecorder; -import io.deephaven.engine.table.impl.MergedListener; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; @@ -54,10 +54,10 @@ public class TimeSeriesFilter private Runnable refreshFunctionForUnitTests; /** - * The merged listener is responsible for listening to the WindowCheck result, updating our rowset that contains the + * The listener is responsible for listening to the WindowCheck result, updating our RowSet that contains the * rows in our window, and then notifying the WhereListener that we are requesting recomputation. */ - private TimeSeriesFilterMergedListener mergedListener; + private TimeSeriesFilterWindowListener windowListener; /** * Create a TimeSeriesFilter on the given column for the given period @@ -168,12 +168,12 @@ public WritableRowSet filter( throw new PreviousFilteringNotSupported(); } - Assert.neqNull(mergedListener, "mergedListener"); + Assert.neqNull(windowListener, "windowListener"); if (invert) { - return selection.minus(mergedListener.inWindowRowSet); + return selection.minus(windowListener.inWindowRowSet); } else { - return selection.intersect(mergedListener.inWindowRowSet); + return selection.intersect(windowListener.inWindowRowSet); } } @@ -192,7 +192,7 @@ public void setRecomputeListener(RecomputeListener listener) { @Override public boolean satisfied(long step) { - return mergedListener.satisfied(step); + return windowListener.satisfied(step); } @Override @@ -226,35 +226,25 @@ public String toString() { '}'; } - // TODO: we should be listener not a merged listener - private class TimeSeriesFilterMergedListener extends MergedListener { + private class TimeSeriesFilterWindowListener extends InstrumentedTableUpdateListenerAdapter { // the list of rows that exist within our window final WritableRowSet inWindowRowSet = RowSetFactory.empty(); - // this is our listener recorder for tableWithWindow - private final ListenerRecorder windowRecorder; - // the columnset that represents our window source private final ModifiedColumnSet windowColumnSet; // the column source containing the window; which we match on private final ColumnSource windowColumnSource; - protected TimeSeriesFilterMergedListener(String listenerDescription, QueryTable tableWithWindow, - final ListenerRecorder windowRecorder, final String windowSourceName) { - super(Collections.singleton(windowRecorder), Collections.singleton(tableWithWindow), listenerDescription, - null); - this.windowRecorder = windowRecorder; + protected TimeSeriesFilterWindowListener(String listenerDescription, QueryTable tableWithWindow, final String windowSourceName) { + super(listenerDescription, tableWithWindow, false); this.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName); this.windowColumnSource = tableWithWindow.getColumnSource(windowSourceName); } - @Override - protected void process() { - Assert.assertion(windowRecorder.recordedVariablesAreValid(), - "windowRecorder.recordedVariablesAreValid()"); - final TableUpdate update = windowRecorder.getUpdate(); + @Override + public void onUpdate(TableUpdate update) { inWindowRowSet.remove(update.removed()); final boolean windowModified = update.modifiedColumnSet().containsAny(windowColumnSet); if (windowModified) { @@ -303,21 +293,15 @@ public SafeCloseable beginOperation(@NotNull Table sourceTable) { final QueryTable tableWithWindow = (QueryTable) pair.first; refreshFunctionForUnitTests = pair.second; - final ListenerRecorder recorder = - new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null); - tableWithWindow.addUpdateListener(recorder); - - mergedListener = new TimeSeriesFilterMergedListener( - "TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")", - tableWithWindow, recorder, windowSourceName); - manage(mergedListener); - - recorder.setMergedListener(mergedListener); + windowListener = new TimeSeriesFilterWindowListener("TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")", + tableWithWindow, windowSourceName); + tableWithWindow.addUpdateListener(windowListener); + manage(windowListener); // we are doing the first match, which is based on the entire set of values in the table - mergedListener.insertMatched(sourceTable.getRowSet()); + windowListener.insertMatched(sourceTable.getRowSet()); - // the only thing we hold is our mergedListener, which in turn holds the recorder and the windowed table + // the only thing we hold is our listener, which in turn holds the windowed table return null; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java index 7360bd43d5b..1cfd13be33e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java @@ -209,7 +209,6 @@ public String toString() { * @param source the source table * @param result our initialized result table */ - private TimeWindowListenerImpl(final String inWindowColumnName, final InWindowColumnSource inWindowColumnSource, final ListenerRecorder recorder, final QueryTable source, final QueryTable result) { super(Collections.singleton(recorder), List.of(), "WindowCheck", result); From 2a7f6c0f96f721c9dbb50b33b2d98f9c7075c0da Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 19 Sep 2024 17:02:22 -0400 Subject: [PATCH 15/20] Code review, buidler. --- .../engine/table/impl/QueryTable.java | 3 +- .../table/impl/select/TimeSeriesFilter.java | 161 ++++++++++++------ .../engine/table/impl/select/WhereFilter.java | 2 +- .../impl/select/TestTimeSeriesFilter.java | 25 ++- .../engine/table/impl/util/TestClock.java | 82 --------- .../io/deephaven/engine/util/TestClock.java | 41 +++++ 6 files changed, 170 insertions(+), 144 deletions(-) delete mode 100644 engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index a9d15f9fb38..afc185720a9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1091,7 +1091,8 @@ void doRefilter( unmatchedRows.insert(refilterRequestedRowset); refilterRequestedRowset = null; } - final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatchedRows); + final WhereListener.ListenerFilterExecution filterExecution = + listener.makeRefilterExecution(unmatchedRows); filterExecution.scheduleCompletion((adds, unusedMods) -> { final WritableRowSet newMapping = adds.writableCast(); // add back what we previously matched, but for modifications and removals diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index d41b3135eac..aa34bb08e08 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -11,7 +11,6 @@ import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter; -import io.deephaven.engine.table.impl.ListenerRecorder; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; @@ -41,6 +40,107 @@ public class TimeSeriesFilter extends WhereFilterLivenessArtifactImpl implements NotificationQueue.Dependency { + /** + * A builder to create a TimeSeriesFilter. + */ + public static class Builder { + private String columnName; + private long periodNanos; + private boolean invert; + private Clock clock; + + private Builder() { + // use newBuilder + } + + /** + * Set the column name to filter on. + * + * @param columnName the column name to filter on, required + * @return this builder + */ + public Builder columnName(String columnName) { + this.columnName = columnName; + return this; + } + + /** + * Set an optional Clock to use for this filter. When not specified, the clock is determined by + * {@link DateTimeUtils#currentClock()}. + * + * @param clock the clock to use for the filter + * @return this builder + */ + public Builder clock(Clock clock) { + this.clock = clock; + return this; + } + + /** + * Set the period for this filter. + * + * @param period the period as a string for the filter + * @return this builder + */ + public Builder period(final String period) { + return period(DateTimeUtils.parseDurationNanos(period)); + } + + /** + * Set the period for this filter. + * + * @param period the period as a Duration for the filter + * @return this builder + */ + public Builder period(Duration period) { + return period(period.toNanos()); + } + + /** + * Set the period for this filter. + * + * @param period the period in nanoseconds for the filter + * @return this builder + */ + public Builder period(long period) { + this.periodNanos = period; + return this; + } + + /** + * Set whether this filter should be inverted. An inverted filter accepts only rows that have null timestamps or + * timestamps older than the period. + * + * @param invert true if the filter should be inverted. + * @return this builder + */ + public Builder invert(boolean invert) { + this.invert = invert; + return this; + } + + /** + * Create the TimeSeriesFilter described by this builder. + * + * @return a TimeSeriesFilter using the options from this builder. + */ + public TimeSeriesFilter build() { + if (columnName == null) { + throw new IllegalArgumentException("Column name is required"); + } + return new TimeSeriesFilter(columnName, periodNanos, invert, clock); + } + } + + /** + * Create a builder for a time series filter. + * + * @return a Builder object to configure a TimeSeriesFilter + */ + public static Builder newBuilder() { + return new Builder(); + } + private final String columnName; private final long periodNanos; private final boolean invert; @@ -54,8 +154,8 @@ public class TimeSeriesFilter private Runnable refreshFunctionForUnitTests; /** - * The listener is responsible for listening to the WindowCheck result, updating our RowSet that contains the - * rows in our window, and then notifying the WhereListener that we are requesting recomputation. + * The listener is responsible for listening to the WindowCheck result, updating our RowSet that contains the rows + * in our window, and then notifying the WhereListener that we are requesting recomputation. */ private TimeSeriesFilterWindowListener windowListener; @@ -79,62 +179,19 @@ public TimeSeriesFilter(final String columnName, */ public TimeSeriesFilter(final String columnName, final long periodNanos) { - this(columnName, periodNanos, false); - } - - // TODO: USE A BUILDER FOR THE CONSTRUCTOR - - /** - * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. - * - *

- * The filter may be inverted, meaning that instead of including recent rows within the window it only - * includes rows outside the window or that are null. - *

- * - * @param columnName the name of the timestamp column - * @param periodNanos the duration of the window in nanoseconds - * @param invert true if only rows outside the window should be included in the result - */ - public TimeSeriesFilter(final String columnName, - final long periodNanos, - final boolean invert) { - this(columnName, periodNanos, invert, null); + this(columnName, periodNanos, false, null); } /** * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. * - *

- * The filter may be inverted, meaning that instead of including recent rows within the window it only - * includes rows outside the window. - *

- * - * @param columnName the name of the timestamp column - * @param period the duration of the window as parsed by {@link DateTimeUtils#parseDurationNanos(String)}. - * @param invert true if only rows outside the window should be included in the result - */ - public TimeSeriesFilter(final String columnName, - final String period, - final boolean invert) { - this(columnName, DateTimeUtils.parseDurationNanos(period), invert, null); - } - - /** - * Create a TimeSeriesFilter on the given column for the given period in nanoseconds. - * - *

- * The filter may be inverted, meaning that instead of including recent rows within the window it only - * includes rows outside the window. - *

- * * @param columnName the name of the timestamp column * @param periodNanos the duration of the window in nanoseconds - * @param invert true if only rows outside the window should be included in the result + * @param invert true if only rows outside the window (or with a null timestamp) should be included in the result * @param clock the Clock to use as a time source for this filter, when null the clock supplied by * {@link DateTimeUtils#currentClock()} is used. */ - public TimeSeriesFilter(final String columnName, + private TimeSeriesFilter(final String columnName, final long periodNanos, final boolean invert, @Nullable final Clock clock) { @@ -236,7 +293,8 @@ private class TimeSeriesFilterWindowListener extends InstrumentedTableUpdateList // the column source containing the window; which we match on private final ColumnSource windowColumnSource; - protected TimeSeriesFilterWindowListener(String listenerDescription, QueryTable tableWithWindow, final String windowSourceName) { + protected TimeSeriesFilterWindowListener(String listenerDescription, QueryTable tableWithWindow, + final String windowSourceName) { super(listenerDescription, tableWithWindow, false); this.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName); this.windowColumnSource = tableWithWindow.getColumnSource(windowSourceName); @@ -293,7 +351,8 @@ public SafeCloseable beginOperation(@NotNull Table sourceTable) { final QueryTable tableWithWindow = (QueryTable) pair.first; refreshFunctionForUnitTests = pair.second; - windowListener = new TimeSeriesFilterWindowListener("TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")", + windowListener = new TimeSeriesFilterWindowListener( + "TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")", tableWithWindow, windowSourceName); tableWithWindow.addUpdateListener(windowListener); manage(windowListener); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java index faac0d76b1a..9dae903ae4e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/WhereFilter.java @@ -71,7 +71,7 @@ interface RecomputeListener { /** * Notify that something about the filters has changed such that the following rows of the source table should - * be re-evaluated. The rowSet ownership is not taken by requestRecompute. + * be re-evaluated. The rowSet ownership is not taken by requestRecompute. */ void requestRecompute(RowSet rowSet); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java index 14559cc8bfc..d5cf3646f2c 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java @@ -9,13 +9,13 @@ import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; -import io.deephaven.engine.table.impl.util.TestClock; import io.deephaven.engine.testutil.*; import io.deephaven.engine.testutil.generator.DateGenerator; import io.deephaven.engine.testutil.generator.IntGenerator; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.util.TestClock; import io.deephaven.time.DateTimeUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.jetbrains.annotations.NotNull; @@ -49,7 +49,7 @@ public void testSimple() { final TestClock testClock = new TestClock().setMillis(startTime); final TimeSeriesFilter timeSeriesFilter = - new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), false, testClock); + TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT00:00:05").clock(testClock).build(); Table filtered = source.where(timeSeriesFilter); TableTools.show(filtered); @@ -97,7 +97,8 @@ public void testInverted() { final TestClock testClock = new TestClock().setMillis(startTime); final TimeSeriesFilter timeSeriesFilter = - new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT00:00:05"), true, testClock); + TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT00:00:05").clock(testClock).invert(true) + .build(); Table filtered = source.where(timeSeriesFilter); TableTools.show(filtered); @@ -147,9 +148,11 @@ public void testIncremental() throws ParseException { final TestClock testClock = new TestClock().setMillis(startDate.getTime()); final TimeSeriesFilter inclusionFilter = - new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock); + TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(false) + .build(); final TimeSeriesFilter exclusionFilter = - new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock); + TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true) + .build(); final ArrayList> filtersToRefresh = new ArrayList<>(); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); @@ -184,9 +187,11 @@ public void testIncremental2() throws ParseException { final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); final TimeSeriesFilter inclusionFilter = - new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock); + TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(false) + .build(); final TimeSeriesFilter exclusionFilter = - new TimeSeriesFilter("Date", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock); + TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true) + .build(); final ArrayList> filtersToRefresh = new ArrayList<>(); EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter); @@ -303,9 +308,11 @@ public void testFilterSequence() { final CountingFilter cfe2 = new CountingFilter(); final TimeSeriesFilter inclusionFilter = - new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT01:00:00"), false, testClock); + TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT01:00:00").clock(testClock) + .invert(false).build(); final TimeSeriesFilter exclusionFilter = - new TimeSeriesFilter("Timestamp", DateTimeUtils.parseDurationNanos("PT01:00:00"), true, testClock); + TimeSeriesFilter.newBuilder().columnName("Timestamp").period("PT01:00:00").clock(testClock).invert(true) + .build(); final Table inclusion = source.where(Filter.and(cfi1, inclusionFilter, cfi2)); final Table exclusion = source.where(Filter.and(cfe1, exclusionFilter, cfe2)); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java deleted file mode 100644 index 4327a61bcd7..00000000000 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestClock.java +++ /dev/null @@ -1,82 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.util; - -import io.deephaven.base.clock.Clock; -import io.deephaven.time.DateTimeUtils; - -import java.time.Instant; -import java.time.ZoneId; - -// TODO: MOVE THIS TO THE EXISTING TESTCLOCK - -/** - * A clock that has a fixed time for use in unit tests. - */ -public class TestClock implements Clock { - private long nanos; - - /** - * Set the time. - * - * @param epochNanos the time to set in nanos since the epoch - * @return this clock - */ - public TestClock setNanos(long epochNanos) { - this.nanos = epochNanos; - return this; - } - - /** - * Set the time. - * - * @param epochMillis the time to set in millis since the epoch - * @return this clock - */ - public TestClock setMillis(long epochMillis) { - this.nanos = DateTimeUtils.millisToNanos(epochMillis); - return this; - } - - /** - * Add millis to the current time. - * - * @param millis the number of millis to add - * @return this clock - */ - public TestClock addMillis(long millis) { - this.nanos += DateTimeUtils.millisToNanos(millis); - return this; - } - - @Override - public long currentTimeMillis() { - return DateTimeUtils.nanosToMillis(nanos); - } - - @Override - public long currentTimeMicros() { - return DateTimeUtils.nanosToMicros(nanos); - } - - @Override - public long currentTimeNanos() { - return nanos; - } - - @Override - public Instant instantNanos() { - return DateTimeUtils.epochNanosToInstant(nanos); - } - - @Override - public Instant instantMillis() { - return instantNanos(); - } - - @Override - public String toString() { - return "TestClock{" + DateTimeUtils.epochNanosToInstant(nanos).atZone(ZoneId.systemDefault()) + "}"; - } -} diff --git a/engine/table/src/test/java/io/deephaven/engine/util/TestClock.java b/engine/table/src/test/java/io/deephaven/engine/util/TestClock.java index 2a7debb4362..0c4ea932f71 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/TestClock.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/TestClock.java @@ -4,6 +4,9 @@ package io.deephaven.engine.util; import io.deephaven.base.clock.ClockNanoBase; +import io.deephaven.time.DateTimeUtils; + +import java.time.ZoneId; public final class TestClock extends ClockNanoBase { public long now; @@ -20,4 +23,42 @@ public TestClock(long nowNanos) { public long currentTimeNanos() { return now; } + + /** + * Set the time. + * + * @param epochNanos the time to set in nanos since the epoch + * @return this clock + */ + public TestClock setNanos(long epochNanos) { + this.now = epochNanos; + return this; + } + + /** + * Set the time. + * + * @param epochMillis the time to set in millis since the epoch + * @return this clock + */ + public TestClock setMillis(long epochMillis) { + this.now = DateTimeUtils.millisToNanos(epochMillis); + return this; + } + + /** + * Add millis to the current time. + * + * @param millis the number of millis to add + * @return this clock + */ + public TestClock addMillis(long millis) { + this.now += DateTimeUtils.millisToNanos(millis); + return this; + } + + @Override + public String toString() { + return "TestClock{" + DateTimeUtils.epochNanosToInstant(now).atZone(ZoneId.systemDefault()) + "}"; + } } From 9f932f1955d67d35fa61f846f92ada3b9443847b Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 24 Sep 2024 13:52:43 -0400 Subject: [PATCH 16/20] close comment --- .../java/io/deephaven/engine/table/impl/QueryTable.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index afc185720a9..ee31098d92c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1079,7 +1079,10 @@ void doRefilter( (matchedRows, unusedMods) -> completeRefilterUpdate(listener, upstream, update, matchedRows), exception -> errorRefilterUpdate(listener, exception, upstream)); refilterMatchedRequested = refilterUnmatchedRequested = false; - refilterRequestedRowset = null; + if (refilterRequestedRowset != null) { + refilterRequestedRowset.close(); + refilterRequestedRowset = null; + } } else if (refilterUnmatchedRequested) { // things that are added or removed are already reflected in source.getRowSet final WritableRowSet unmatchedRows = source.getRowSet().minus(getRowSet()); @@ -1089,6 +1092,7 @@ void doRefilter( } if (refilterRequestedRowset != null) { unmatchedRows.insert(refilterRequestedRowset); + refilterRequestedRowset.close(); refilterRequestedRowset = null; } final WhereListener.ListenerFilterExecution filterExecution = @@ -1116,6 +1120,7 @@ void doRefilter( } if (refilterRequestedRowset != null) { matchedRows.insert(refilterRequestedRowset); + refilterRequestedRowset.close(); refilterRequestedRowset = null; } From 86992b295ca0898f5ced6dd2489a45c5ae4adb22 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 24 Sep 2024 15:38:45 -0400 Subject: [PATCH 17/20] final consistency. --- .../engine/table/impl/select/TimeSeriesFilter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index aa34bb08e08..22a52024308 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -59,7 +59,7 @@ private Builder() { * @param columnName the column name to filter on, required * @return this builder */ - public Builder columnName(String columnName) { + public Builder columnName(final String columnName) { this.columnName = columnName; return this; } @@ -71,7 +71,7 @@ public Builder columnName(String columnName) { * @param clock the clock to use for the filter * @return this builder */ - public Builder clock(Clock clock) { + public Builder clock(final Clock clock) { this.clock = clock; return this; } @@ -92,7 +92,7 @@ public Builder period(final String period) { * @param period the period as a Duration for the filter * @return this builder */ - public Builder period(Duration period) { + public Builder period(final Duration period) { return period(period.toNanos()); } @@ -102,7 +102,7 @@ public Builder period(Duration period) { * @param period the period in nanoseconds for the filter * @return this builder */ - public Builder period(long period) { + public Builder period(final long period) { this.periodNanos = period; return this; } @@ -114,7 +114,7 @@ public Builder period(long period) { * @param invert true if the filter should be inverted. * @return this builder */ - public Builder invert(boolean invert) { + public Builder invert(final boolean invert) { this.invert = invert; return this; } From a02dba457627a1dde636e544b30c84c0294c932e Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 24 Sep 2024 15:43:01 -0400 Subject: [PATCH 18/20] we need to write a test for this case. --- .../main/java/io/deephaven/engine/table/impl/QueryTable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index ee31098d92c..b622bfab69d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1162,6 +1162,8 @@ private void completeRefilterUpdate( final TableUpdate upstream, final TableUpdateImpl update, final RowSet newMapping) { + // TODO: IS THIS CONSISTENT WITH THE REMOVAL PROCESSING? + // Compute added/removed in post-shift keyspace. update.added = newMapping.minus(getRowSet()); From b18db5216f6b7663c53624ee4ceedb7a5c36e198 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 24 Sep 2024 17:12:24 -0400 Subject: [PATCH 19/20] unit test --- .../engine/table/impl/QueryTable.java | 2 -- .../table/impl/QueryTableWhereTest.java | 31 ++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index b622bfab69d..ee31098d92c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1162,8 +1162,6 @@ private void completeRefilterUpdate( final TableUpdate upstream, final TableUpdateImpl update, final RowSet newMapping) { - // TODO: IS THIS CONSISTENT WITH THE REMOVAL PROCESSING? - // Compute added/removed in post-shift keyspace. update.added = newMapping.minus(getRowSet()); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 1ddf87dfa4a..75a79339460 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -31,6 +31,7 @@ import io.deephaven.engine.testutil.QueryTableTestBase.TableComparator; import io.deephaven.engine.testutil.generator.*; import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.engine.util.PrintListener; import io.deephaven.engine.util.TableTools; import io.deephaven.gui.table.filters.Condition; import io.deephaven.internal.log.LoggerFactory; @@ -43,7 +44,6 @@ import junit.framework.TestCase; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -1647,4 +1647,33 @@ public void testBigDecimalCoercion() { final Table bi_result = table.where("X >= bi_5"); assertTableEquals(range_result, bi_result); } + + @Test + public void testAddAndRemoveRefilter() { + final QueryTable source = testRefreshingTable(i(10, 20, 30).toTracking(), stringCol("FV", "A", "B", "C"), intCol("Sentinel", 10, 20, 30)); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + final QueryTable setTable = testRefreshingTable(i(10, 30).toTracking(), stringCol("FV", "A", "C")); + + final Table result = source.whereIn(setTable, "FV"); + final SimpleListener listener = new SimpleListener(result); + result.addUpdateListener(listener); + + final PrintListener plResult = new PrintListener("testAddAndRemoveRefilter-result", result); + assertTableEquals(source.where("FV in `A`, `C`"), result); + + updateGraph.runWithinUnitTestCycle(() -> { + TstUtils.addToTable(setTable, i(20), stringCol("FV", "B")); + TstUtils.removeRows(setTable, i(30)); + setTable.notifyListeners(i(20), i(30), i()); + TstUtils.addToTable(source, i(10), stringCol("FV", "A"), intCol("Sentinel", 40)); + source.notifyListeners(i(10), i(10), i()); + }); + + assertEquals(i(10, 20), listener.update.added()); + assertEquals(i(10, 30), listener.update.removed()); + assertEquals(i(), listener.update.modified()); + + assertTableEquals(source.where("FV in `A`, `B`"), result); + } } From 115da85f28ebbba3062c7102911a62c690149e77 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 24 Sep 2024 17:25:06 -0400 Subject: [PATCH 20/20] spotless --- .../io/deephaven/engine/table/impl/QueryTableWhereTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 75a79339460..8b581fd4fce 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -1650,13 +1650,14 @@ public void testBigDecimalCoercion() { @Test public void testAddAndRemoveRefilter() { - final QueryTable source = testRefreshingTable(i(10, 20, 30).toTracking(), stringCol("FV", "A", "B", "C"), intCol("Sentinel", 10, 20, 30)); + final QueryTable source = testRefreshingTable(i(10, 20, 30).toTracking(), stringCol("FV", "A", "B", "C"), + intCol("Sentinel", 10, 20, 30)); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); final QueryTable setTable = testRefreshingTable(i(10, 30).toTracking(), stringCol("FV", "A", "C")); final Table result = source.whereIn(setTable, "FV"); - final SimpleListener listener = new SimpleListener(result); + final SimpleListener listener = new SimpleListener(result); result.addUpdateListener(listener); final PrintListener plResult = new PrintListener("testAddAndRemoveRefilter-result", result);