Skip to content

Commit

Permalink
Error notifications need to participate in the satisfaction state mac…
Browse files Browse the repository at this point in the history
…hine for their listeners in the same way as regular TableUpdate notifications (#4155)
  • Loading branch information
rcaudy committed Jul 10, 2023
1 parent 3299027 commit b4b1549
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,46 @@ protected final void onFailureInternalWithDependent(
}
}

/**
* Record that we are enqueuing a new notification, and validate our state re: double-notification. This step is
* important to ensure that {@link #satisfied(long)} will return correct results.
*/
private void onNotificationCreated() {
final long currentStep = getUpdateGraph().clock().currentStep();
if (lastCompletedStep == currentStep) {
// noinspection ThrowableNotThrown
Assert.statementNeverExecuted("Enqueued after lastCompletedStep already set to current step: " + this
+ ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep);
}

StepUpdater.forceUpdateRecordedStep(
LAST_ENQUEUED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep);
}

/**
* Validate recorded state before executing a notification.
*
* @param currentStep The current logical clock step
*/
private void beforeRunNotification(final long currentStep) {
Assert.eq(lastEnqueuedStep, "lastEnqueuedStep", currentStep, "currentStep");
if (lastCompletedStep >= currentStep) {
throw new IllegalStateException(
"Execution began after lastCompletedStep already set to current step: " + this
+ ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep);
}
}

/**
* Update recorded state after executing a notification.
*
* @param currentStep The current logical clock step
*/
private void afterRunNotification(final long currentStep) {
StepUpdater.forceUpdateRecordedStep(
LAST_COMPLETED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep);
}

public class ErrorNotification extends AbstractNotification implements NotificationQueue.ErrorNotification {

private final Throwable originalException;
Expand All @@ -193,19 +233,26 @@ public class ErrorNotification extends AbstractNotification implements Notificat
super(terminalListener);
this.originalException = originalException;
this.sourceEntry = sourceEntry;
onNotificationCreated();
}

@Override
public void run() {
if (failed) {
return;
}

failed = true;
AsyncErrorLogger.log(DateTimeUtils.nowMillisResolution(), entry, sourceEntry, originalException);

final long currentStep = getUpdateGraph().clock().currentStep();
try {
beforeRunNotification(currentStep);
onFailure(originalException, sourceEntry);
} catch (Exception e) {
log.error().append("Error propagating failure from ").append(sourceEntry).append(": ").append(e).endl();
} finally {
afterRunNotification(currentStep);
}
}

Expand All @@ -228,16 +275,7 @@ protected abstract class NotificationBase extends AbstractNotification implement
NotificationBase(final TableUpdate update) {
super(terminalListener);
this.update = update.acquire();

final long currentStep = getUpdateGraph().clock().currentStep();
if (lastCompletedStep == currentStep) {
// noinspection ThrowableNotThrown
Assert.statementNeverExecuted("Enqueued after lastCompletedStep already set to current step: " + this
+ ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep);
}

StepUpdater.forceUpdateRecordedStep(
LAST_ENQUEUED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep);
onNotificationCreated();
}

@Override
Expand Down Expand Up @@ -282,13 +320,7 @@ private void doRunInternal(final Runnable invokeOnUpdate) {

final long currentStep = getUpdateGraph().clock().currentStep();
try {
Assert.eq(lastEnqueuedStep, "lastEnqueuedStep", currentStep, "currentStep");
if (lastCompletedStep >= currentStep) {
throw new IllegalStateException(
"Execution began after lastCompletedStep already set to current step: " + this
+ ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep);
}

beforeRunNotification(currentStep);
invokeOnUpdate.run();
} catch (Exception e) {
final LogEntry en = log.error().append("Uncaught exception for entry= ");
Expand Down Expand Up @@ -321,9 +353,8 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
failed = true;
onFailure(e, entry);
} finally {
afterRunNotification(currentStep);
entry.onUpdateEnd();
StepUpdater.forceUpdateRecordedStep(
LAST_COMPLETED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.deephaven.engine.table.impl.select.MatchFilter.MatchType;
import io.deephaven.engine.table.impl.sources.DeferredGroupingColumnSource;
import io.deephaven.engine.table.impl.sources.LongAsInstantColumnSource;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.table.impl.util.ColumnHolder;
import io.deephaven.engine.testutil.*;
Expand All @@ -50,7 +51,6 @@
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.groovy.util.Maps;
import org.jetbrains.annotations.MustBeInvokedByOverriders;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
Expand All @@ -65,6 +65,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -3380,6 +3381,118 @@ protected MockUncoalescedTable copy() {
}
}

public void testMergedListenerWithFailure() {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();

// Note: parent1 and parent2 have no parents; they will be satisfied as long as the update graph is satisfied.
final QueryTable parent1 = (QueryTable) emptyTable(100).updateView("AnInt=1").coalesce();
parent1.setRefreshing(true);
final ListenerRecorder listenerRecorder1 = new ListenerRecorder("ONE", parent1, null);
parent1.addUpdateListener(listenerRecorder1);
final QueryTable parent2 = (QueryTable) emptyTable(100).updateView("AnInt=2").coalesce();
parent2.setRefreshing(true);
final ListenerRecorder listenerRecorder2 = new ListenerRecorder("TWO", parent2, null);
parent2.addUpdateListener(listenerRecorder2);

// Result is just a place to record last notification step and satisfy MergedListener's constructor
final QueryTable result = new QueryTable(
RowSetFactory.flat(100).toTracking(),
Map.of("NullString", NullValueColumnSource.getInstance(String.class, null)));
result.setRefreshing(true);
assertEquals(updateGraph.clock().currentStep(), result.getLastNotificationStep());
assertFalse(result.isFailed());

// Re-usable "fake update" that we'll propagate
final TableUpdate fakeUpdate = new TableUpdateImpl(RowSetFactory.flat(100), RowSetFactory.flat(100),
RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY);

// Merged listener should properly combine the two upstream notifications
final AtomicLong processedStep = new AtomicLong();
final MergedListener mergedListener = new MergedListener(
List.of(listenerRecorder1, listenerRecorder2), List.of(), "MERGED", result) {
@Override
protected void process() {
processedStep.set(getUpdateGraph().clock().currentStep());
result.notifyListeners(fakeUpdate.acquire());
}
};
result.addParentReference(mergedListener);
listenerRecorder1.setMergedListener(mergedListener);
listenerRecorder2.setMergedListener(mergedListener);

updateGraph.runWithinUnitTestCycle(() -> parent1.notifyListeners(fakeUpdate.acquire()));
assertEquals(updateGraph.clock().currentStep(), processedStep.get());
assertEquals(updateGraph.clock().currentStep(), result.getLastNotificationStep());
assertFalse(result.isFailed());

try (final SafeCloseable ignoredCompleter = updateGraph::completeCycleForUnitTests) {
final long previousStep = updateGraph.clock().currentStep();
updateGraph.startCycleForUnitTests(false);
final long currentStep = updateGraph.clock().currentStep();

// We start out unsatisfied
assertFalse(listenerRecorder1.satisfied(currentStep));
assertFalse(listenerRecorder2.satisfied(currentStep));
assertFalse(mergedListener.satisfied(currentStep));
assertFalse(result.satisfied(currentStep));

// Adding an update notification changes nothing
parent1.notifyListeners(fakeUpdate.acquire());
assertFalse(listenerRecorder1.satisfied(currentStep));
assertFalse(mergedListener.satisfied(currentStep));
assertFalse(result.satisfied(currentStep));

// Adding an error notification changes nothing
parent2.notifyListenersOnError(new RuntimeException("FAKE ERROR"), null);
assertFalse(listenerRecorder2.satisfied(currentStep));
assertFalse(mergedListener.satisfied(currentStep));
assertFalse(result.satisfied(currentStep));

// Since we have notifications enqueued, marking the update graph satisfied changes nothing
updateGraph.markSourcesRefreshedForUnitTests();
assertFalse(listenerRecorder1.satisfied(currentStep));
assertFalse(listenerRecorder2.satisfied(currentStep));
assertFalse(mergedListener.satisfied(currentStep));
assertFalse(result.satisfied(currentStep));

// Flushing the first parent's table update notification satisfies just its listener recorder
updateGraph.flushOneNotificationForUnitTests();
assertTrue(listenerRecorder1.satisfied(currentStep));
assertFalse(listenerRecorder2.satisfied(currentStep));
assertFalse(mergedListener.satisfied(currentStep));
assertFalse(result.satisfied(currentStep));

// Flushing the second parent's table update notification satisfies just its listener recorder
try (final SafeCloseable ignoredErrorExpectation = new ErrorExpectation()) {
updateGraph.flushOneNotificationForUnitTests();
}
assertTrue(listenerRecorder1.satisfied(currentStep));
assertTrue(listenerRecorder2.satisfied(currentStep));
assertFalse(mergedListener.satisfied(currentStep));
assertFalse(result.satisfied(currentStep));

// Make sure our recorded steps are the expected values: nothing has been delivered to the result yet
assertEquals(previousStep, processedStep.get());
assertEquals(previousStep, result.getLastNotificationStep());
assertFalse(result.isFailed());

// Flushing the merged listener's notification satisfies everything
try (final SafeCloseable ignoredErrorExpectation = new ErrorExpectation()) {
updateGraph.flushOneNotificationForUnitTests();
}
assertTrue(listenerRecorder1.satisfied(currentStep));
assertTrue(listenerRecorder2.satisfied(currentStep));
assertTrue(mergedListener.satisfied(currentStep));
assertTrue(result.satisfied(currentStep));

// Make sure our recorded steps are the expected values: we delivered a failure notification to the result,
// but the merged listener never processed a table update
assertEquals(previousStep, processedStep.get());
assertEquals(currentStep, result.getLastNotificationStep());
assertTrue(result.isFailed());
}
}

public void testMultipleUpdateGraphs() {
final QueryTable r1, s1, r2, s2;
final UpdateGraph g1 = new DummyUpdateGraph("one");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ protected static void simulateShiftAwareStep(final GenerateTableUpdates.Simulati
// System.gc();
}

public class ErrorExpectation implements Closeable {
public class ErrorExpectation implements SafeCloseable {
final boolean originalExpectError;

public ErrorExpectation() {
Expand Down

0 comments on commit b4b1549

Please sign in to comment.