Skip to content

Commit

Permalink
Re-implement ParallelWhere using JobScheduler semantics. (deephav…
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 authored Jan 24, 2024
1 parent 68f9350 commit 3be488b
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 751 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,109 +1,44 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;

/**
* A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in
* the {@link io.deephaven.engine.updategraph.OperationInitializer OperationInitializer}.
*/
class InitialFilterExecution extends AbstractFilterExecution {
private final QueryTable sourceTable;
private final boolean permitParallelization;
private final int segmentCount;
private final WhereFilter[] filters;

/**
* The pendingSatisfaction list is global to the root node of this InitialExecutionFilter. The outstanding children
* allows us to count how many jobs exist. If we have no outstanding jobs, but unsatisfied Notifications then an
* error has occurred.
*/
private final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> pendingSatisfaction;
private final Map<Thread, Thread> runningChildren;
private final AtomicBoolean cancelled;

/**
* The SubEntry lets us track query performance for the split jobs.
*/
private BasePerformanceEntry basePerformanceEntry;
private final int segmentCount;
private final boolean permitParallelization;

/**
* The InitialFilterExecution that represents all the work we are doing for this table.
*/
private final InitialFilterExecution root;
private final JobScheduler jobScheduler;

InitialFilterExecution(
final QueryTable sourceTable,
final WhereFilter[] filters,
final RowSet addedInput,
final long addStart,
final long addEnd,
final InitialFilterExecution parent,
final int filterIndex,
final boolean usePrev) {
super(sourceTable, filters, addedInput, addStart, addEnd, null, 0, 0, parent, usePrev, false,
ModifiedColumnSet.ALL, filterIndex);
this.sourceTable = sourceTable;
permitParallelization = permitParallelization(filters);
this.filters = filters;
if (parent == null) {
pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
: QueryTable.PARALLEL_WHERE_SEGMENTS;
runningChildren = Collections.synchronizedMap(new IdentityHashMap<>());
cancelled = new AtomicBoolean(false);
this.root = this;
super(sourceTable, filters, addedInput, null, usePrev, false, ModifiedColumnSet.ALL);
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0
? ExecutionContext.getContext().getOperationInitializer().parallelismFactor()
: QueryTable.PARALLEL_WHERE_SEGMENTS;
permitParallelization = permitParallelization(filters)
&& !QueryTable.DISABLE_PARALLEL_WHERE
&& segmentCount > 1
&& ExecutionContext.getContext().getOperationInitializer().canParallelize();

// If any of the filters can be parallelized, we will use the OperationInitializerJobScheduler.
if (permitParallelization) {
jobScheduler = new OperationInitializerJobScheduler();
} else {
pendingSatisfaction = parent.pendingSatisfaction;
segmentCount = parent.segmentCount;
this.root = parent.root;
runningChildren = null;
cancelled = null;
}
}

@Override
void enqueueSubFilters(
List<AbstractFilterExecution> subFilters,
AbstractFilterExecution.CombinationNotification combinationNotification) {
synchronized (pendingSatisfaction) {
enqueueJobs(subFilters);
pendingSatisfaction.offer(combinationNotification);
}
}

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
ExecutionContext.getContext().getOperationInitializer().submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
notification.run();
} else {
// we must ensure that we, the parent InitialFilterExecution, are notified of completion
onChildCompleted();
}
if (Thread.interrupted()) {
// we would like to throw a query cancellation exception
exceptionResult = new CancellationException("thread interrupted");
}
} finally {
root.runningChildren.remove(Thread.currentThread());
}
});
jobScheduler = ImmediateJobScheduler.INSTANCE;
}
}

Expand All @@ -113,70 +48,16 @@ int getTargetSegments() {
}

@Override
boolean doParallelization(long numberOfRows) {
return permitParallelization
&& ExecutionContext.getContext().getOperationInitializer().canParallelize()
&& doParallelizationBase(numberOfRows);
}

@Override
void handleUncaughtException(Exception throwable) {
throw new UnsupportedOperationException(throwable);
}

@Override
void accumulatePerformanceEntry(BasePerformanceEntry entry) {
synchronized (root) {
if (root.basePerformanceEntry != null) {
root.basePerformanceEntry.accumulate(entry);
} else {
root.basePerformanceEntry = entry;
}
}
}

/**
* Run any satisfied jobs in the pendingSatisfaction list.
*/
@Override
void onNoChildren() {
final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> satisfied = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
synchronized (pendingSatisfaction) {
for (final Iterator<NotificationQueue.Notification> it = pendingSatisfaction.iterator(); it.hasNext();) {
final NotificationQueue.Notification notification = it.next();
if (notification.canExecute(0)) {
satisfied.offer(notification);
it.remove();
}
}
}
if (satisfied.isEmpty()) {
return;
}
satisfied.forEach(NotificationQueue.Notification::run);
JobScheduler jobScheduler() {
return jobScheduler;
}

@Override
InitialFilterExecution makeChild(
final RowSet addedInput,
final long addStart,
final long addEnd,
final RowSet modifyInput,
final long modifyStart,
final long modifyEnd,
final int filterIndex) {
Assert.eqNull(modifyInput, "modifyInput");
return new InitialFilterExecution(sourceTable, filters, addedInput, addStart, addEnd, this, filterIndex,
usePrev);
boolean permitParallelization() {
return permitParallelization;
}

BasePerformanceEntry getBasePerformanceEntry() {
return basePerformanceEntry;
}

void setCancelled() {
cancelled.set(true);
runningChildren.forEach((thread, ignored) -> thread.interrupt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.liveness.LivenessScope;
Expand All @@ -48,17 +49,14 @@
import io.deephaven.engine.table.impl.select.MatchPairFactory;
import io.deephaven.engine.table.impl.select.SelectColumnFactory;
import io.deephaven.engine.table.impl.updateby.UpdateBy;
import io.deephaven.engine.table.impl.util.ImmediateJobScheduler;
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
import io.deephaven.engine.table.impl.util.FieldUtils;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.table.iterators.*;
import io.deephaven.engine.updategraph.DynamicNode;
import io.deephaven.engine.util.*;
import io.deephaven.engine.util.systemicmarking.SystemicObject;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.vector.Vector;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
Expand Down Expand Up @@ -982,6 +980,9 @@ public static class FilteredTable extends QueryTable implements WhereFilter.Reco
private boolean refilterUnmatchedRequested = false;
private MergedListener whereListener;

@ReferentialIntegrity
Runnable delayedErrorReference;

public FilteredTable(final TrackingRowSet currentMapping, final QueryTable source) {
super(source.getDefinition(), currentMapping, source.columns, null, null);
this.source = source;
Expand Down Expand Up @@ -1066,9 +1067,10 @@ void doRefilter(

if (refilterMatchedRequested && refilterUnmatchedRequested) {
final WhereListener.ListenerFilterExecution filterExecution =
listener.makeFilterExecution(source.getRowSet().copy());
listener.makeRefilterExecution(source.getRowSet().copy());
filterExecution.scheduleCompletion(
fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = refilterUnmatchedRequested = false;
} else if (refilterUnmatchedRequested) {
// things that are added or removed are already reflected in source.getRowSet
Expand All @@ -1078,9 +1080,9 @@ void doRefilter(
unmatchedRows.insert(upstream.modified());
}
final RowSet unmatched = unmatchedRows.copy();
final WhereListener.ListenerFilterExecution filterExecution = listener.makeFilterExecution(unmatched);
filterExecution.scheduleCompletion(fe -> {
final WritableRowSet newMapping = fe.addedResult;
final WhereListener.ListenerFilterExecution filterExecution = listener.makeRefilterExecution(unmatched);
filterExecution.scheduleCompletion((adds, mods) -> {
final WritableRowSet newMapping = adds.writableCast();
// add back what we previously matched, but for modifications and removals
try (final WritableRowSet previouslyMatched = getRowSet().copy()) {
if (upstream != null) {
Expand All @@ -1089,8 +1091,8 @@ void doRefilter(
}
newMapping.insert(previouslyMatched);
}
completeRefilterUpdate(listener, upstream, update, fe.addedResult);
});
completeRefilterUpdate(listener, upstream, update, adds);
}, exception -> errorRefilterUpdate(listener, exception, upstream));
refilterUnmatchedRequested = false;
} else if (refilterMatchedRequested) {
// we need to take removed rows out of our rowSet so we do not read them, and also examine added or
Expand All @@ -1103,9 +1105,10 @@ void doRefilter(
final RowSet matchedClone = matchedRows.copy();

final WhereListener.ListenerFilterExecution filterExecution =
listener.makeFilterExecution(matchedClone);
listener.makeRefilterExecution(matchedClone);
filterExecution.scheduleCompletion(
fe -> completeRefilterUpdate(listener, upstream, update, fe.addedResult));
(adds, mods) -> completeRefilterUpdate(listener, upstream, update, adds),
exception -> errorRefilterUpdate(listener, exception, upstream));
refilterMatchedRequested = false;
} else {
throw new IllegalStateException("Refilter called when a refilter was not requested!");
Expand Down Expand Up @@ -1141,11 +1144,24 @@ private void completeRefilterUpdate(
update.shifted = upstream == null ? RowSetShiftData.EMPTY : upstream.shifted();

notifyListeners(update);
if (upstream != null) {
upstream.release();
}

listener.setFinalExecutionStep();
// Release the upstream update and set the final notification step.
listener.finalizeUpdate(upstream);
}

private void errorRefilterUpdate(final WhereListener listener, final Exception e, final TableUpdate upstream) {
// Notify listeners that we had an issue refreshing the table.
if (getLastNotificationStep() == updateGraph.clock().currentStep()) {
if (listener != null) {
listener.forceReferenceCountToZero();
}
delayedErrorReference = new DelayedErrorNotifier(e, listener == null ? null : listener.entry, this);
} else {
notifyListenersOnError(e, listener == null ? null : listener.entry);
forceReferenceCountToZero();
}
// Release the upstream update and set the final notification step.
listener.finalizeUpdate(upstream);
}

private void setWhereListener(MergedListener whereListener) {
Expand Down Expand Up @@ -1226,41 +1242,18 @@ private QueryTable whereInternal(final WhereFilter... filters) {

final CompletableFuture<TrackingWritableRowSet> currentMappingFuture =
new CompletableFuture<>();

final InitialFilterExecution initialFilterExecution = new InitialFilterExecution(
this, filters, rowSetToUse.copy(), 0, rowSetToUse.size(), null, 0,
usePrev) {
@Override
void handleUncaughtException(Exception throwable) {
currentMappingFuture.completeExceptionally(throwable);
}
};
final ExecutionContext executionContext = ExecutionContext.getContext();
initialFilterExecution.scheduleCompletion(x -> {
try (final SafeCloseable ignored = executionContext.open()) {
if (x.exceptionResult != null) {
currentMappingFuture.completeExceptionally(x.exceptionResult);
} else {
currentMappingFuture.complete(x.addedResult.toTracking());
}
}
});
this, filters, rowSetToUse.copy(), usePrev);
final TrackingWritableRowSet currentMapping;
initialFilterExecution.scheduleCompletion((adds, mods) -> {
currentMappingFuture.complete(adds.writableCast().toTracking());
}, currentMappingFuture::completeExceptionally);

boolean cancelled = false;
TrackingWritableRowSet currentMapping = null;
try {
boolean done = false;
while (!done) {
try {
currentMapping = currentMappingFuture.get();
done = true;
} catch (InterruptedException e) {
// cancel the job and wait for it to finish cancelling
cancelled = true;
initialFilterExecution.setCancelled();
}
}
} catch (ExecutionException e) {
if (cancelled) {
currentMapping = currentMappingFuture.get();
} catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
throw new CancellationException("interrupted while filtering");
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
Expand Down
Loading

0 comments on commit 3be488b

Please sign in to comment.