diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java
index 1f72098d501..fdbb01bedac 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java
@@ -1173,6 +1173,18 @@ public static long callDataSnapshotFunction(
* Invokes the snapshot function in a loop until it succeeds with provably consistent results, or until
* {@code MAX_CONCURRENT_ATTEMPTS} or {@code MAX_CONCURRENT_ATTEMPT_DURATION_MILLIS} are exceeded. Falls back to
* acquiring a shared update graph lock for a final attempt.
+ *
+ * The supplied {@link SnapshotControl}'s {@link SnapshotControl#usePreviousValues usePreviousValues} will be
+ * invoked at the start of any snapshot attempt, and its {@link SnapshotControl#snapshotCompletedConsistently
+ * snapshotCompletedConsistently} will be invoked at the end of any snapshot attempt that is not provably
+ * inconsistent.
+ *
+ * If the supplied {@link SnapshotControl} provides a null {@link SnapshotControl#getUpdateGraph UpdateGraph}, then
+ * this method will perform a static snapshot without locks or retrying. In this case, the {@link SnapshotControl}'s
+ * {@link SnapshotControl#usePreviousValues usePreviousValues} must return {@code false},
+ * {@link SnapshotControl#snapshotCompletedConsistently snapshotCompletedConsistently} must return {@code true}, and
+ * the {@link LogicalClock#NULL_CLOCK_VALUE NULL_CLOCK_VALUE} will be supplied to {@code usePreviousValues} and
+ * {@code snapshotCompletedConsistently}.
*
* @param logPrefix A prefix for our log messages
* @param control A {@link SnapshotControl} to define the parameters and consistency for this snapshot
@@ -1190,13 +1202,27 @@ public static long callDataSnapshotFunction(
if (updateGraph == null) {
// This is a snapshot of static data. Just call the function with no frippery.
+ final boolean controlUsePrev = control.usePreviousValues(LogicalClock.NULL_CLOCK_VALUE);
+ if (controlUsePrev) {
+ throw new SnapshotUnsuccessfulException("Static snapshot requested previous values");
+ }
+
final boolean functionSuccessful = function.call(false, LogicalClock.NULL_CLOCK_VALUE);
- Assert.assertion(functionSuccessful, "functionSuccessful");
+ if (!functionSuccessful) {
+ throw new SnapshotUnsuccessfulException("Static snapshot failed to execute snapshot function");
+ }
if (log.isDebugEnabled()) {
final long duration = System.currentTimeMillis() - overallStart;
log.debug().append(logPrefix)
.append(" Static snapshot function elapsed time ").append(duration).append(" ms").endl();
}
+
+ // notify control of successful snapshot
+ final boolean controlSuccessful =
+ control.snapshotCompletedConsistently(LogicalClock.NULL_CLOCK_VALUE, false);
+ if (!controlSuccessful) {
+ throw new SnapshotUnsuccessfulException("Static snapshot function succeeded but control failed");
+ }
return LogicalClock.NULL_CLOCK_VALUE;
}
diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java
index c8ad8192fed..33b48e654dc 100644
--- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java
+++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java
@@ -501,18 +501,19 @@ public void addSubscription(final StreamObserver listener,
final Subscription subscription =
new Subscription(listener, options, cols, initialViewport, reverseViewport);
- log.debug().append(logPrefix)
- .append(subscription.logPrefix)
- .append("subbing to columns ")
- .append(FormatBitSet.formatBitSet(cols))
- .endl();
+ if (log.isDebugEnabled()) {
+ log.debug().append(logPrefix)
+ .append(subscription.logPrefix)
+ .append("subbing to columns ")
+ .append(FormatBitSet.formatBitSet(cols))
+ .append(" and scheduling update immediately, for initial snapshot.")
+ .endl();
+ }
subscription.hasPendingUpdate = true;
pendingSubscriptions.add(subscription);
// we'd like to send the initial snapshot as soon as possible
- log.debug().append(logPrefix).append(subscription.logPrefix)
- .append("scheduling update immediately, for initial snapshot.").endl();
updatePropagationJob.scheduleImmediately();
}
}
@@ -528,6 +529,10 @@ private boolean findAndUpdateSubscription(final StreamObserver list
pendingSubscriptions.add(sub);
}
+ if (log.isDebugEnabled()) {
+ log.debug().append(logPrefix).append("Find and update subscription scheduling immediately.")
+ .endl();
+ }
updatePropagationJob.scheduleImmediately();
return true;
}
@@ -570,16 +575,20 @@ public boolean updateSubscription(final StreamObserver listener, @N
}
sub.pendingColumns = cols;
- log.debug().append(logPrefix).append(sub.logPrefix)
- .append("scheduling update immediately, for viewport and column updates.").endl();
+ if (log.isDebugEnabled()) {
+ log.debug().append(logPrefix).append(sub.logPrefix)
+ .append("scheduling update immediately, for viewport and column updates.").endl();
+ }
});
}
public void removeSubscription(final StreamObserver listener) {
findAndUpdateSubscription(listener, sub -> {
sub.pendingDelete = true;
- log.debug().append(logPrefix).append(sub.logPrefix)
- .append("scheduling update immediately, for removed subscription.").endl();
+ if (log.isDebugEnabled()) {
+ log.debug().append(logPrefix).append(sub.logPrefix)
+ .append("scheduling update immediately, for removed subscription.").endl();
+ }
});
}
@@ -587,7 +596,7 @@ public void removeSubscription(final StreamObserver listener) {
// Update Processing and Data Recording Methods //
//////////////////////////////////////////////////
- public DeltaListener constructListener() {
+ public InstrumentedTableUpdateListener constructListener() {
return parentIsRefreshing ? new DeltaListener() : null;
}
@@ -1340,7 +1349,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
long elapsed = System.nanoTime() - start;
recordMetric(stats -> stats.snapshot, elapsed);
- if (SUBSCRIPTION_GROWTH_ENABLED && snapshot.rowsIncluded.size() > 0) {
+ if (SUBSCRIPTION_GROWTH_ENABLED && !snapshot.rowsIncluded.isEmpty()) {
// very simplistic logic to take the last snapshot and extrapolate max number of rows that will
// not exceed the target UGP processing time percentage
PeriodicUpdateGraph updateGraph = parent.getUpdateGraph().cast();
@@ -1364,7 +1373,7 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}
synchronized (this) {
- if (growingSubscriptions.size() == 0 && pendingDeltas.isEmpty() && pendingError == null) {
+ if (growingSubscriptions.isEmpty() && pendingDeltas.isEmpty() && pendingError == null) {
return;
}
@@ -1450,6 +1459,10 @@ private void updateSubscriptionsSnapshotAndPropagate() {
if (snapshot != null) {
try (final BarrageStreamGenerator snapshotGenerator =
streamGeneratorFactory.newGenerator(snapshot, this::recordWriteMetrics)) {
+ if (log.isDebugEnabled()) {
+ log.debug().append(logPrefix).append("Sending snapshot to ").append(activeSubscriptions.size())
+ .append(" subscriber(s).").endl();
+ }
for (final Subscription subscription : growingSubscriptions) {
if (subscription.pendingDelete) {
continue;
@@ -1488,6 +1501,10 @@ private void updateSubscriptionsSnapshotAndPropagate() {
}
if (numGrowingSubscriptions > 0) {
+ if (log.isDebugEnabled()) {
+ log.info().append(logPrefix).append("Have ").append(numGrowingSubscriptions)
+ .append(" growing subscriptions; scheduling next snapshot immediately.").endl();
+ }
updatePropagationJob.scheduleImmediately();
}
@@ -1882,7 +1899,7 @@ final class ColumnInfo {
delta.update.shifted().unapply(modifiedRemaining);
}
- if (unfilledAdds.size() > 0) {
+ if (!unfilledAdds.isEmpty()) {
Assert.assertion(false, "Error: added:" + coalescer.added + " unfilled:" + unfilledAdds
+ " missing:" + coalescer.added.subSetForPositions(unfilledAdds));
}
@@ -2024,6 +2041,13 @@ private void finalizeSnapshotForSubscriptions(final List subscript
|| subscription.growingRemainingViewport.firstRowKey() >= parentTableSize
|| isBlinkTable;
+ if (log.isDebugEnabled()) {
+ log.debug().append(logPrefix)
+ .append(subscription.logPrefix)
+ .append("finalizing snapshot isComplete=").append(isComplete)
+ .endl();
+ }
+
if (isComplete) {
// this subscription is complete, remove it from the growing list
subscription.isGrowingViewport = false;
@@ -2195,14 +2219,15 @@ public boolean snapshotCompletedConsistently(final long afterClockValue, final b
}
if (log.isDebugEnabled()) {
log.debug().append(logPrefix)
- .append("success=").append(success).append(", validStep=").append(resultValidStep).endl();
+ .append("success=").append(success).append(", validStep=").append(resultValidStep)
+ .append(", numSnapshotSubscriptions=").append(snapshotSubscriptions.size()).endl();
}
return success;
}
@Override
public UpdateGraph getUpdateGraph() {
- return parent.getUpdateGraph();
+ return parent.isRefreshing() ? parent.getUpdateGraph() : null;
}
}