Skip to content

Commit

Permalink
ConstructSnapshot + BarrageMessageProducer: Use Static ConstructSnaps…
Browse files Browse the repository at this point in the history
…hot Fast Path (#4876)
  • Loading branch information
nbauernfeind committed Nov 22, 2023
1 parent 2c64fb1 commit eab615f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,18 @@ public static long callDataSnapshotFunction(
* Invokes the snapshot function in a loop until it succeeds with provably consistent results, or until
* {@code MAX_CONCURRENT_ATTEMPTS} or {@code MAX_CONCURRENT_ATTEMPT_DURATION_MILLIS} are exceeded. Falls back to
* acquiring a shared update graph lock for a final attempt.
* <p>
* The supplied {@link SnapshotControl}'s {@link SnapshotControl#usePreviousValues usePreviousValues} will be
* invoked at the start of any snapshot attempt, and its {@link SnapshotControl#snapshotCompletedConsistently
* snapshotCompletedConsistently} will be invoked at the end of any snapshot attempt that is not provably
* inconsistent.
* <p>
* If the supplied {@link SnapshotControl} provides a null {@link SnapshotControl#getUpdateGraph UpdateGraph}, then
* this method will perform a static snapshot without locks or retrying. In this case, the {@link SnapshotControl}'s
* {@link SnapshotControl#usePreviousValues usePreviousValues} must return {@code false},
* {@link SnapshotControl#snapshotCompletedConsistently snapshotCompletedConsistently} must return {@code true}, and
* the {@link LogicalClock#NULL_CLOCK_VALUE NULL_CLOCK_VALUE} will be supplied to {@code usePreviousValues} and
* {@code snapshotCompletedConsistently}.
*
* @param logPrefix A prefix for our log messages
* @param control A {@link SnapshotControl} to define the parameters and consistency for this snapshot
Expand All @@ -1190,13 +1202,27 @@ public static long callDataSnapshotFunction(

if (updateGraph == null) {
// This is a snapshot of static data. Just call the function with no frippery.
final boolean controlUsePrev = control.usePreviousValues(LogicalClock.NULL_CLOCK_VALUE);
if (controlUsePrev) {
throw new SnapshotUnsuccessfulException("Static snapshot requested previous values");
}

final boolean functionSuccessful = function.call(false, LogicalClock.NULL_CLOCK_VALUE);
Assert.assertion(functionSuccessful, "functionSuccessful");
if (!functionSuccessful) {
throw new SnapshotUnsuccessfulException("Static snapshot failed to execute snapshot function");
}
if (log.isDebugEnabled()) {
final long duration = System.currentTimeMillis() - overallStart;
log.debug().append(logPrefix)
.append(" Static snapshot function elapsed time ").append(duration).append(" ms").endl();
}

// notify control of successful snapshot
final boolean controlSuccessful =
control.snapshotCompletedConsistently(LogicalClock.NULL_CLOCK_VALUE, false);
if (!controlSuccessful) {
throw new SnapshotUnsuccessfulException("Static snapshot function succeeded but control failed");
}
return LogicalClock.NULL_CLOCK_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,18 +501,19 @@ public void addSubscription(final StreamObserver<MessageView> listener,
final Subscription subscription =
new Subscription(listener, options, cols, initialViewport, reverseViewport);

log.debug().append(logPrefix)
.append(subscription.logPrefix)
.append("subbing to columns ")
.append(FormatBitSet.formatBitSet(cols))
.endl();
if (log.isDebugEnabled()) {
log.debug().append(logPrefix)
.append(subscription.logPrefix)
.append("subbing to columns ")
.append(FormatBitSet.formatBitSet(cols))
.append(" and scheduling update immediately, for initial snapshot.")
.endl();
}

subscription.hasPendingUpdate = true;
pendingSubscriptions.add(subscription);

// we'd like to send the initial snapshot as soon as possible
log.debug().append(logPrefix).append(subscription.logPrefix)
.append("scheduling update immediately, for initial snapshot.").endl();
updatePropagationJob.scheduleImmediately();
}
}
Expand All @@ -528,6 +529,10 @@ private boolean findAndUpdateSubscription(final StreamObserver<MessageView> list
pendingSubscriptions.add(sub);
}

if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append("Find and update subscription scheduling immediately.")
.endl();
}
updatePropagationJob.scheduleImmediately();
return true;
}
Expand Down Expand Up @@ -570,24 +575,28 @@ public boolean updateSubscription(final StreamObserver<MessageView> listener, @N
}

sub.pendingColumns = cols;
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for viewport and column updates.").endl();
if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for viewport and column updates.").endl();
}
});
}

public void removeSubscription(final StreamObserver<MessageView> listener) {
findAndUpdateSubscription(listener, sub -> {
sub.pendingDelete = true;
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for removed subscription.").endl();
if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append(sub.logPrefix)
.append("scheduling update immediately, for removed subscription.").endl();
}
});
}

//////////////////////////////////////////////////
// Update Processing and Data Recording Methods //
//////////////////////////////////////////////////

public DeltaListener constructListener() {
public InstrumentedTableUpdateListener constructListener() {
return parentIsRefreshing ? new DeltaListener() : null;
}

Expand Down Expand Up @@ -1340,7 +1349,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
long elapsed = System.nanoTime() - start;
recordMetric(stats -> stats.snapshot, elapsed);

if (SUBSCRIPTION_GROWTH_ENABLED && snapshot.rowsIncluded.size() > 0) {
if (SUBSCRIPTION_GROWTH_ENABLED && !snapshot.rowsIncluded.isEmpty()) {
// very simplistic logic to take the last snapshot and extrapolate max number of rows that will
// not exceed the target UGP processing time percentage
PeriodicUpdateGraph updateGraph = parent.getUpdateGraph().cast();
Expand All @@ -1364,7 +1373,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}

synchronized (this) {
if (growingSubscriptions.size() == 0 && pendingDeltas.isEmpty() && pendingError == null) {
if (growingSubscriptions.isEmpty() && pendingDeltas.isEmpty() && pendingError == null) {
return;
}

Expand Down Expand Up @@ -1450,6 +1459,10 @@ private void updateSubscriptionsSnapshotAndPropagate() {
if (snapshot != null) {
try (final BarrageStreamGenerator<MessageView> snapshotGenerator =
streamGeneratorFactory.newGenerator(snapshot, this::recordWriteMetrics)) {
if (log.isDebugEnabled()) {
log.debug().append(logPrefix).append("Sending snapshot to ").append(activeSubscriptions.size())
.append(" subscriber(s).").endl();
}
for (final Subscription subscription : growingSubscriptions) {
if (subscription.pendingDelete) {
continue;
Expand Down Expand Up @@ -1488,6 +1501,10 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}

if (numGrowingSubscriptions > 0) {
if (log.isDebugEnabled()) {
log.info().append(logPrefix).append("Have ").append(numGrowingSubscriptions)
.append(" growing subscriptions; scheduling next snapshot immediately.").endl();
}
updatePropagationJob.scheduleImmediately();
}

Expand Down Expand Up @@ -1882,7 +1899,7 @@ final class ColumnInfo {
delta.update.shifted().unapply(modifiedRemaining);
}

if (unfilledAdds.size() > 0) {
if (!unfilledAdds.isEmpty()) {
Assert.assertion(false, "Error: added:" + coalescer.added + " unfilled:" + unfilledAdds
+ " missing:" + coalescer.added.subSetForPositions(unfilledAdds));
}
Expand Down Expand Up @@ -2024,6 +2041,13 @@ private void finalizeSnapshotForSubscriptions(final List<Subscription> subscript
|| subscription.growingRemainingViewport.firstRowKey() >= parentTableSize
|| isBlinkTable;

if (log.isDebugEnabled()) {
log.debug().append(logPrefix)
.append(subscription.logPrefix)
.append("finalizing snapshot isComplete=").append(isComplete)
.endl();
}

if (isComplete) {
// this subscription is complete, remove it from the growing list
subscription.isGrowingViewport = false;
Expand Down Expand Up @@ -2195,14 +2219,15 @@ public boolean snapshotCompletedConsistently(final long afterClockValue, final b
}
if (log.isDebugEnabled()) {
log.debug().append(logPrefix)
.append("success=").append(success).append(", validStep=").append(resultValidStep).endl();
.append("success=").append(success).append(", validStep=").append(resultValidStep)
.append(", numSnapshotSubscriptions=").append(snapshotSubscriptions.size()).endl();
}
return success;
}

@Override
public UpdateGraph getUpdateGraph() {
return parent.getUpdateGraph();
return parent.isRefreshing() ? parent.getUpdateGraph() : null;
}
}

Expand Down

0 comments on commit eab615f

Please sign in to comment.