Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Embed WindowCheck in TimeSeriesFilter. #6081

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
private final QueryTable source;
private boolean refilterMatchedRequested = false;
private boolean refilterUnmatchedRequested = false;
private WritableRowSet refilterRequestedRowset = null;
private MergedListener whereListener;

@ReferentialIntegrity
Expand Down Expand Up @@ -1006,6 +1007,16 @@ public void requestRecomputeMatched() {
Require.neqNull(whereListener, "whereListener").notifyChanges();
}

@Override
public void requestRecompute(RowSet rowSet) {
if (refilterRequestedRowset == null) {
refilterRequestedRowset = rowSet.copy();
} else {
refilterRequestedRowset.insert(rowSet);
}
Require.neqNull(whereListener, "whereListener").notifyChanges();
}

/**
* Note that refilterRequested is only accessible so that {@link WhereListener} can get to it and is not part of
* the public API.
Expand All @@ -1014,7 +1025,7 @@ public void requestRecomputeMatched() {
*/
@InternalUseOnly
boolean refilterRequested() {
return refilterUnmatchedRequested || refilterMatchedRequested;
return refilterUnmatchedRequested || refilterMatchedRequested || refilterRequestedRowset != null;
}

@NotNull
Expand Down Expand Up @@ -1065,19 +1076,24 @@ void doRefilter(
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(source.getRowSet().copy());
filterExecution.scheduleCompletion(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
(matchedRows, unusedMods) -> completeRefilterUpdate(listener, upstream, update, matchedRows),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = refilterUnmatchedRequested = false;
refilterRequestedRowset = null;
} else if (refilterUnmatchedRequested) {
// things that are added or removed are already reflected in source.getRowSet
final WritableRowSet unmatchedRows = source.getRowSet().minus(getRowSet());
// we must check rows that have been modified instead of just preserving them
if (upstream != null) {
unmatchedRows.insert(upstream.modified());
}
final RowSet unmatched = unmatchedRows.copy();
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
filterExecution.scheduleCompletion((adds, mods) -> {
if (refilterRequestedRowset != null) {
unmatchedRows.insert(refilterRequestedRowset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We copy() unmatchedRows below, and I think that is redundant. As-is, we're still never closing it.

refilterRequestedRowset = null;
}
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(unmatchedRows);
filterExecution.scheduleCompletion((adds, unusedMods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
Expand All @@ -1098,14 +1114,39 @@ void doRefilter(
matchedRows.insert(upstream.added());
matchedRows.insert(upstream.modified());
}
final RowSet matchedClone = matchedRows.copy();
if (refilterRequestedRowset != null) {
matchedRows.insert(refilterRequestedRowset);
refilterRequestedRowset = null;
}

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(matchedClone);
listener.makeRefilterExecution(matchedRows);
filterExecution.scheduleCompletion(
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
(adds, unusedMods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = false;
} else if (refilterRequestedRowset != null) {
final WritableRowSet rowsToFilter = refilterRequestedRowset;
if (upstream != null) {
rowsToFilter.insert(upstream.added());
rowsToFilter.insert(upstream.modified());
}

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeRefilterExecution(rowsToFilter);

filterExecution.scheduleCompletion((adds, unusedMods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, except for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
previouslyMatched.remove(rowsToFilter);
newMapping.insert(previouslyMatched);
}
completeRefilterUpdate(listener, upstream, update, adds);
}, exception -> errorRefilterUpdate(listener, exception, upstream));


refilterRequestedRowset = null;
} else {
throw new IllegalStateException("Refilter called when a refilter was not requested!");
}
Expand All @@ -1118,22 +1159,21 @@ private void completeRefilterUpdate(
final RowSet newMapping) {
// Compute added/removed in post-shift keyspace.
update.added = newMapping.minus(getRowSet());
final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping);

// Update our index in post-shift keyspace.
getRowSet().writableCast().remove(postShiftRemovals);
getRowSet().writableCast().insert(update.added);
try (final WritableRowSet postShiftRemovals = getRowSet().minus(newMapping)) {
getRowSet().writableCast().resetTo(newMapping);

// Note that removed must be propagated to listeners in pre-shift keyspace.
if (upstream != null) {
upstream.shifted().unapply(postShiftRemovals);
// Note that removed must be propagated to listeners in pre-shift keyspace.
if (upstream != null) {
upstream.shifted().unapply(postShiftRemovals);
}
update.removed.writableCast().insert(postShiftRemovals);
}
update.removed.writableCast().insert(postShiftRemovals);

if (upstream == null || upstream.modified().isEmpty()) {
update.modified = RowSetFactory.empty();
} else {
update.modified = upstream.modified().intersect(newMapping);
update.modified = upstream.modified().intersect(getRowSet());
update.modified.writableCast().remove(update.added);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,21 @@ public void requestRecompute() {

@Override
public void requestRecomputeUnmatched() {
// TODO: No need to recompute matched rows
// TODO: No need to recompute matched rows (https://github.com/deephaven/deephaven-core/issues/6083)
doRecompute = true;
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}

@Override
public void requestRecomputeMatched() {
// TODO: No need to recompute unmatched rows
// TODO: No need to recompute unmatched rows (https://github.com/deephaven/deephaven-core/issues/6083)
doRecompute = true;
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}

@Override
public void requestRecompute(RowSet rowSet) {
// TODO: No need to recompute the remaining rows (https://github.com/deephaven/deephaven-core/issues/6083)
doRecompute = true;
Require.neqNull(mergedListener, "mergedListener").notifyChanges();
}
Expand Down
Loading