Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimeSeriesFilter with embedded WindowCheck #6077

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
private final QueryTable source;
private boolean refilterMatchedRequested = false;
private boolean refilterUnmatchedRequested = false;
private WritableRowSet refilterRequestedRowset = null;
private MergedListener whereListener;

@ReferentialIntegrity
Expand Down Expand Up @@ -1006,6 +1007,16 @@ public void requestRecomputeMatched() {
Require.neqNull(whereListener, "whereListener").notifyChanges();
}

@Override
public void requestRecompute(RowSet rowSet) {
if (refilterRequestedRowset == null) {
refilterRequestedRowset = rowSet.copy();
} else {
refilterRequestedRowset.insert(rowSet);
}
Require.neqNull(whereListener, "whereListener").notifyChanges();
}

/**
* Note that refilterRequested is only accessible so that {@link WhereListener} can get to it and is not part of
* the public API.
Expand All @@ -1014,7 +1025,7 @@ public void requestRecomputeMatched() {
*/
@InternalUseOnly
boolean refilterRequested() {
return refilterUnmatchedRequested || refilterMatchedRequested;
return refilterUnmatchedRequested || refilterMatchedRequested || refilterRequestedRowset != null;
}

@NotNull
Expand Down Expand Up @@ -1068,13 +1079,18 @@ void doRefilter(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = refilterUnmatchedRequested = false;
refilterRequestedRowset = null;
} else if (refilterUnmatchedRequested) {
// things that are added or removed are already reflected in source.getRowSet
final WritableRowSet unmatchedRows = source.getRowSet().minus(getRowSet());
// we must check rows that have been modified instead of just preserving them
if (upstream != null) {
unmatchedRows.insert(upstream.modified());
}
if (refilterRequestedRowset != null) {
unmatchedRows.insert(refilterRequestedRowset);
refilterRequestedRowset = null;
}
final RowSet unmatched = unmatchedRows.copy();
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
filterExecution.scheduleCompletion((adds, mods) -> {
Expand All @@ -1098,6 +1114,10 @@ void doRefilter(
matchedRows.insert(upstream.added());
matchedRows.insert(upstream.modified());
}
if (refilterRequestedRowset != null) {
matchedRows.insert(refilterRequestedRowset);
refilterRequestedRowset = null;
}
final RowSet matchedClone = matchedRows.copy();

final WhereListener.ListenerFilterExecution filterExecution =
Expand All @@ -1106,6 +1126,28 @@ void doRefilter(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = false;
} else if (refilterRequestedRowset != null) {
final WritableRowSet rowsToFilter = refilterRequestedRowset.copy();
if (upstream != null) {
rowsToFilter.insert(upstream.added());
rowsToFilter.insert(upstream.modified());
}

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(rowsToFilter);

filterExecution.scheduleCompletion((adds, mods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
previouslyMatched.remove(rowsToFilter);
newMapping.insert(previouslyMatched);
}
completeRefilterUpdate(listener, upstream, update, adds);
}, exception -> errorRefilterUpdate(listener, exception, upstream));


refilterRequestedRowset = null;
} else {
throw new IllegalStateException("Refilter called when a refilter was not requested!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,13 @@ public void requestRecomputeMatched() {
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}

@Override
public void requestRecompute(RowSet rowSet) {
// TODO: No need to recompute the remaining rows
doRecompute = true;
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}

@NotNull
@Override
public QueryTable getTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,88 @@
//
package io.deephaven.engine.table.impl.select;

import io.deephaven.base.Pair;
import io.deephaven.base.clock.Clock;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderSequential;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.WindowCheck;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.TestUseOnly;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.time.Instant;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

/**
* This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column).
* TimeSeriesFilter filters a timestamp colum within the table for recent rows.
*
* <p>
* The filtered column must be an Instant or long containing nanoseconds since the epoch. The computation of recency is
* delegated to {@link io.deephaven.engine.util.WindowCheck}.
* </p>
*
* <p>
* Note, this filter rescans the source table. You should prefer to use {@link io.deephaven.engine.util.WindowCheck}
* instead.
* When the filter is not inverted, null rows are not accepted and rows that match the window exactly are accepted.
* </p>
*/
public class TimeSeriesFilter
extends WhereFilterLivenessArtifactImpl
implements Runnable, NotificationQueue.Dependency {
protected final String columnName;
protected final long nanos;
implements NotificationQueue.Dependency {
private final String columnName;
private final long periodNanos;
private final boolean invert;
private final Clock clock;

private RecomputeListener listener;
private Runnable refreshFunctionForUnitTests;

/**
* We create the merged listener so that we can participate in dependency resolution, but the dependencies are not
* actually known until the first call to filter().
*/
private TimeSeriesFilterMergedListener mergedListener;

@SuppressWarnings("UnusedDeclaration")
public TimeSeriesFilter(String columnName, String period) {
public TimeSeriesFilter(final String columnName,
final String period) {
this(columnName, DateTimeUtils.parseDurationNanos(period));
}

public TimeSeriesFilter(String columnName, long nanos) {
Require.gtZero(nanos, "nanos");
public TimeSeriesFilter(final String columnName,
final long nanos) {
this(columnName, nanos, false);
}

public TimeSeriesFilter(final String columnName,
final long nanos,
final boolean invert) {
this(columnName, nanos, invert, null);
}

public TimeSeriesFilter(final String columnName,
final long periodNanos,
final boolean invert,
@Nullable final Clock clock) {
Require.gtZero(periodNanos, "periodNanos");
this.columnName = columnName;
this.nanos = nanos;
this.periodNanos = periodNanos;
this.invert = invert;
this.clock = clock;
}

@Override
Expand All @@ -63,7 +102,7 @@ public void init(@NotNull final TableDefinition tableDefinition) {}

@NotNull
@Override
public WritableRowSet filter(
public synchronized WritableRowSet filter(
@NotNull final RowSet selection,
@NotNull final RowSet fullSet,
@NotNull final Table table,
Expand All @@ -72,28 +111,13 @@ public WritableRowSet filter(
throw new PreviousFilteringNotSupported();
}

ColumnSource<Instant> dateColumn = table.getColumnSource(columnName);
if (!Instant.class.isAssignableFrom(dateColumn.getType())) {
throw new RuntimeException(columnName + " is not an Instant column!");
}

long nanoBoundary = getNowNanos() - nanos;
Assert.neqNull(mergedListener, "mergedListener");

RowSetBuilderSequential indexBuilder = RowSetFactory.builderSequential();
for (RowSet.Iterator it = selection.iterator(); it.hasNext();) {
long row = it.nextLong();
Instant instant = dateColumn.get(row);
long nanoValue = DateTimeUtils.epochNanos(instant);
if (nanoValue >= nanoBoundary) {
indexBuilder.appendKey(row);
}
if (invert) {
return selection.minus(mergedListener.inWindowRowSet);
} else {
return selection.intersect(mergedListener.inWindowRowSet);
}

return indexBuilder.build();
}

protected long getNowNanos() {
return Clock.system().currentTimeNanos();
}

@Override
Expand All @@ -107,12 +131,11 @@ public void setRecomputeListener(RecomputeListener listener) {
Assert.eqNull(this.listener, "this.listener");
this.listener = listener;
listener.setIsRefreshing(true);
updateGraph.addSource(this);
}

@Override
public boolean satisfied(long step) {
return updateGraph.satisfied(step);
return mergedListener.satisfied(step);
}

@Override
Expand All @@ -122,7 +145,7 @@ public UpdateGraph getUpdateGraph() {

@Override
public TimeSeriesFilter copy() {
return new TimeSeriesFilter(columnName, nanos);
return new TimeSeriesFilter(columnName, periodNanos, invert, clock);
}

@Override
Expand All @@ -131,13 +154,105 @@ public boolean isRefreshing() {
}

@Override
public void run() {
listener.requestRecomputeMatched();
public boolean permitParallelization() {
return false;
}

private class TimeSeriesFilterMergedListener extends MergedListener {
// the list of rows that exist within our window
final WritableRowSet inWindowRowSet = RowSetFactory.empty();

// this table contains our window
private final QueryTable tableWithWindow;
// this is our listener recorder for tableWithWindow
private final ListenerRecorder windowRecorder;

// the name of the window column
private final String windowSourceName;

private final ModifiedColumnSet windowColumnSet;

protected TimeSeriesFilterMergedListener(String listenerDescription, QueryTable tableWithWindow,
final ListenerRecorder windowRecorder, final String windowSourceName) {
super(Collections.singleton(windowRecorder), Collections.singleton(tableWithWindow), listenerDescription,
null);
this.tableWithWindow = tableWithWindow;
this.windowRecorder = windowRecorder;
this.windowSourceName = windowSourceName;
this.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName);
}

@Override
protected void process() {
synchronized (TimeSeriesFilter.this) {
Assert.assertion(windowRecorder.recordedVariablesAreValid(),
"windowRecorder.recordedVariablesAreValid()");
final TableUpdate update = windowRecorder.getUpdate();

inWindowRowSet.remove(update.removed());
final boolean windowModified = update.modifiedColumnSet().containsAny(windowColumnSet);
if (windowModified) {
// we need to check on the modified rows; they may be in the window,
inWindowRowSet.remove(update.getModifiedPreShift());
}
update.shifted().apply(inWindowRowSet);
if (windowModified) {
final RowSet newlyMatched = insertMatched(update.modified());
try (final WritableRowSet movedOutOfWindow = update.modified().minus(newlyMatched)) {
if (movedOutOfWindow.isNonempty()) {
listener.requestRecompute(movedOutOfWindow);
}
}
}
insertMatched(update.added());
}
}

private RowSet insertMatched(final RowSet rowSet) {
// The original filter did not include nulls for a regular filter, so we do not include them here either to
// maintain compatibility. That also means the inverted filter is going to include nulls (as the null is
// less than the current time using Deephaven long comparisons).
final RowSet matched = tableWithWindow.getColumnSource(windowSourceName).match(false, false, false,
null, rowSet, Boolean.TRUE);
inWindowRowSet.insert(matched);
return matched;
}
}

@TestUseOnly
void runForUnitTests() {
refreshFunctionForUnitTests.run();
}

@Override
protected void destroy() {
super.destroy();
updateGraph.removeSource(this);
public SafeCloseable beginOperation(@NotNull Table sourceTable) {
String windowSourceName = "__Window_" + columnName;
while (sourceTable.getDefinition().getColumnNames().contains(windowSourceName)) {
windowSourceName = "_" + windowSourceName;
}

final Pair<Table, WindowCheck.TimeWindowListener> pair = WindowCheck.addTimeWindowInternal(clock,
(QueryTable) sourceTable, columnName, periodNanos + 1, windowSourceName, true);
final QueryTable tableWithWindow = (QueryTable) pair.first;
refreshFunctionForUnitTests = pair.second;

manage(tableWithWindow);

final ListenerRecorder recorder =
new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null);
tableWithWindow.addUpdateListener(recorder);

mergedListener = new TimeSeriesFilterMergedListener(
"TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")",
tableWithWindow, recorder, windowSourceName);
manage(mergedListener);

recorder.setMergedListener(mergedListener);

// we are doing the first match, which is based on the entire set of values in the table
mergedListener.insertMatched(sourceTable.getRowSet());

// the only thing we hold is our mergedListener, which in turn holds the recorder and the windowed table
return null;
}
}
Loading
Loading