Skip to content

Commit

Permalink
simpler listener
Browse files Browse the repository at this point in the history
  • Loading branch information
cpwright committed Sep 19, 2024
1 parent 247f1d8 commit 22a2975
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1158,15 +1158,16 @@ private void completeRefilterUpdate(
final RowSet newMapping) {
// Compute added/removed in post-shift keyspace.
update.added = newMapping.minus(getRowSet());
final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping);

getRowSet().writableCast().resetTo(newMapping);
try (final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping)) {
getRowSet().writableCast().resetTo(newMapping);

// Note that removed must be propagated to listeners in pre-shift keyspace.
if (upstream != null) {
upstream.shifted().unapply(postShiftRemovals);
// Note that removed must be propagated to listeners in pre-shift keyspace.
if (upstream != null) {
upstream.shifted().unapply(postShiftRemovals);
}
update.removed.writableCast().insert(postShiftRemovals);
}
update.removed = postShiftRemovals;

if (upstream == null || upstream.modified().isEmpty()) {
update.modified = RowSetFactory.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListenerAdapter;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
Expand Down Expand Up @@ -54,10 +54,10 @@ public class TimeSeriesFilter
private Runnable refreshFunctionForUnitTests;

/**
* The merged listener is responsible for listening to the WindowCheck result, updating our rowset that contains the
* The listener is responsible for listening to the WindowCheck result, updating our RowSet that contains the
* rows in our window, and then notifying the WhereListener that we are requesting recomputation.
*/
private TimeSeriesFilterMergedListener mergedListener;
private TimeSeriesFilterWindowListener windowListener;

/**
* Create a TimeSeriesFilter on the given column for the given period
Expand Down Expand Up @@ -168,12 +168,12 @@ public WritableRowSet filter(
throw new PreviousFilteringNotSupported();
}

Assert.neqNull(mergedListener, "mergedListener");
Assert.neqNull(windowListener, "windowListener");

if (invert) {
return selection.minus(mergedListener.inWindowRowSet);
return selection.minus(windowListener.inWindowRowSet);
} else {
return selection.intersect(mergedListener.inWindowRowSet);
return selection.intersect(windowListener.inWindowRowSet);
}
}

Expand All @@ -192,7 +192,7 @@ public void setRecomputeListener(RecomputeListener listener) {

@Override
public boolean satisfied(long step) {
return mergedListener.satisfied(step);
return windowListener.satisfied(step);
}

@Override
Expand Down Expand Up @@ -226,35 +226,25 @@ public String toString() {
'}';
}

// TODO: we should be listener not a merged listener
private class TimeSeriesFilterMergedListener extends MergedListener {
private class TimeSeriesFilterWindowListener extends InstrumentedTableUpdateListenerAdapter {
// the list of rows that exist within our window
final WritableRowSet inWindowRowSet = RowSetFactory.empty();

// this is our listener recorder for tableWithWindow
private final ListenerRecorder windowRecorder;

// the columnset that represents our window source
private final ModifiedColumnSet windowColumnSet;

// the column source containing the window; which we match on
private final ColumnSource<Object> windowColumnSource;

protected TimeSeriesFilterMergedListener(String listenerDescription, QueryTable tableWithWindow,
final ListenerRecorder windowRecorder, final String windowSourceName) {
super(Collections.singleton(windowRecorder), Collections.singleton(tableWithWindow), listenerDescription,
null);
this.windowRecorder = windowRecorder;
protected TimeSeriesFilterWindowListener(String listenerDescription, QueryTable tableWithWindow, final String windowSourceName) {
super(listenerDescription, tableWithWindow, false);
this.windowColumnSet = tableWithWindow.newModifiedColumnSet(windowSourceName);
this.windowColumnSource = tableWithWindow.getColumnSource(windowSourceName);
}

@Override
protected void process() {
Assert.assertion(windowRecorder.recordedVariablesAreValid(),
"windowRecorder.recordedVariablesAreValid()");
final TableUpdate update = windowRecorder.getUpdate();

@Override
public void onUpdate(TableUpdate update) {
inWindowRowSet.remove(update.removed());
final boolean windowModified = update.modifiedColumnSet().containsAny(windowColumnSet);
if (windowModified) {
Expand Down Expand Up @@ -303,21 +293,15 @@ public SafeCloseable beginOperation(@NotNull Table sourceTable) {
final QueryTable tableWithWindow = (QueryTable) pair.first;
refreshFunctionForUnitTests = pair.second;

final ListenerRecorder recorder =
new ListenerRecorder("TimeSeriesFilter-ListenerRecorder", tableWithWindow, null);
tableWithWindow.addUpdateListener(recorder);

mergedListener = new TimeSeriesFilterMergedListener(
"TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")",
tableWithWindow, recorder, windowSourceName);
manage(mergedListener);

recorder.setMergedListener(mergedListener);
windowListener = new TimeSeriesFilterWindowListener("TimeSeriesFilter(" + columnName + ", " + Duration.ofNanos(periodNanos) + ", " + invert + ")",
tableWithWindow, windowSourceName);
tableWithWindow.addUpdateListener(windowListener);
manage(windowListener);

// we are doing the first match, which is based on the entire set of values in the table
mergedListener.insertMatched(sourceTable.getRowSet());
windowListener.insertMatched(sourceTable.getRowSet());

// the only thing we hold is our mergedListener, which in turn holds the recorder and the windowed table
// the only thing we hold is our listener, which in turn holds the windowed table
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public String toString() {
* @param source the source table
* @param result our initialized result table
*/

private TimeWindowListenerImpl(final String inWindowColumnName, final InWindowColumnSource inWindowColumnSource,
final ListenerRecorder recorder, final QueryTable source, final QueryTable result) {
super(Collections.singleton(recorder), List.of(), "WindowCheck", result);
Expand Down

0 comments on commit 22a2975

Please sign in to comment.