From b4b154986290da6a709f305450d9ee2e750185f1 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Mon, 10 Jul 2023 15:25:12 -0400 Subject: [PATCH] Error notifications need to participate in the satisfaction state machine for their listeners in the same way as regular TableUpdate notifications (#4155) --- .../impl/InstrumentedTableListenerBase.java | 69 ++++++++--- .../engine/table/impl/QueryTableTest.java | 115 +++++++++++++++++- .../testcase/RefreshingTableTestCase.java | 2 +- 3 files changed, 165 insertions(+), 21 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java index d5c1c6e4a2f..6bf40ed3b9b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java @@ -184,6 +184,46 @@ protected final void onFailureInternalWithDependent( } } + /** + * Record that we are enqueuing a new notification, and validate our state re: double-notification. This step is + * important to ensure that {@link #satisfied(long)} will return correct results. + */ + private void onNotificationCreated() { + final long currentStep = getUpdateGraph().clock().currentStep(); + if (lastCompletedStep == currentStep) { + // noinspection ThrowableNotThrown + Assert.statementNeverExecuted("Enqueued after lastCompletedStep already set to current step: " + this + + ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep); + } + + StepUpdater.forceUpdateRecordedStep( + LAST_ENQUEUED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep); + } + + /** + * Validate recorded state before executing a notification. + * + * @param currentStep The current logical clock step + */ + private void beforeRunNotification(final long currentStep) { + Assert.eq(lastEnqueuedStep, "lastEnqueuedStep", currentStep, "currentStep"); + if (lastCompletedStep >= currentStep) { + throw new IllegalStateException( + "Execution began after lastCompletedStep already set to current step: " + this + + ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep); + } + } + + /** + * Update recorded state after executing a notification. + * + * @param currentStep The current logical clock step + */ + private void afterRunNotification(final long currentStep) { + StepUpdater.forceUpdateRecordedStep( + LAST_COMPLETED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep); + } + public class ErrorNotification extends AbstractNotification implements NotificationQueue.ErrorNotification { private final Throwable originalException; @@ -193,6 +233,7 @@ public class ErrorNotification extends AbstractNotification implements Notificat super(terminalListener); this.originalException = originalException; this.sourceEntry = sourceEntry; + onNotificationCreated(); } @Override @@ -200,12 +241,18 @@ public void run() { if (failed) { return; } + failed = true; AsyncErrorLogger.log(DateTimeUtils.nowMillisResolution(), entry, sourceEntry, originalException); + + final long currentStep = getUpdateGraph().clock().currentStep(); try { + beforeRunNotification(currentStep); onFailure(originalException, sourceEntry); } catch (Exception e) { log.error().append("Error propagating failure from ").append(sourceEntry).append(": ").append(e).endl(); + } finally { + afterRunNotification(currentStep); } } @@ -228,16 +275,7 @@ protected abstract class NotificationBase extends AbstractNotification implement NotificationBase(final TableUpdate update) { super(terminalListener); this.update = update.acquire(); - - final long currentStep = getUpdateGraph().clock().currentStep(); - if (lastCompletedStep == currentStep) { - // noinspection ThrowableNotThrown - Assert.statementNeverExecuted("Enqueued after lastCompletedStep already set to current step: " + this - + ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep); - } - - StepUpdater.forceUpdateRecordedStep( - LAST_ENQUEUED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep); + onNotificationCreated(); } @Override @@ -282,13 +320,7 @@ private void doRunInternal(final Runnable invokeOnUpdate) { final long currentStep = getUpdateGraph().clock().currentStep(); try { - Assert.eq(lastEnqueuedStep, "lastEnqueuedStep", currentStep, "currentStep"); - if (lastCompletedStep >= currentStep) { - throw new IllegalStateException( - "Execution began after lastCompletedStep already set to current step: " + this - + ", step=" + currentStep + ", lastCompletedStep=" + lastCompletedStep); - } - + beforeRunNotification(currentStep); invokeOnUpdate.run(); } catch (Exception e) { final LogEntry en = log.error().append("Uncaught exception for entry= "); @@ -321,9 +353,8 @@ private void doRunInternal(final Runnable invokeOnUpdate) { failed = true; onFailure(e, entry); } finally { + afterRunNotification(currentStep); entry.onUpdateEnd(); - StepUpdater.forceUpdateRecordedStep( - LAST_COMPLETED_STEP_UPDATER, InstrumentedTableListenerBase.this, currentStep); } } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index e4d7d3fd11e..b15c0e72587 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -30,6 +30,7 @@ import io.deephaven.engine.table.impl.select.MatchFilter.MatchType; import io.deephaven.engine.table.impl.sources.DeferredGroupingColumnSource; import io.deephaven.engine.table.impl.sources.LongAsInstantColumnSource; +import io.deephaven.engine.table.impl.sources.NullValueColumnSource; import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.engine.table.impl.util.ColumnHolder; import io.deephaven.engine.testutil.*; @@ -50,7 +51,6 @@ import junit.framework.TestCase; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.groovy.util.Maps; -import org.jetbrains.annotations.MustBeInvokedByOverriders; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.Assert; @@ -65,6 +65,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.*; import java.util.stream.LongStream; @@ -3380,6 +3381,118 @@ protected MockUncoalescedTable copy() { } } + public void testMergedListenerWithFailure() { + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + // Note: parent1 and parent2 have no parents; they will be satisfied as long as the update graph is satisfied. + final QueryTable parent1 = (QueryTable) emptyTable(100).updateView("AnInt=1").coalesce(); + parent1.setRefreshing(true); + final ListenerRecorder listenerRecorder1 = new ListenerRecorder("ONE", parent1, null); + parent1.addUpdateListener(listenerRecorder1); + final QueryTable parent2 = (QueryTable) emptyTable(100).updateView("AnInt=2").coalesce(); + parent2.setRefreshing(true); + final ListenerRecorder listenerRecorder2 = new ListenerRecorder("TWO", parent2, null); + parent2.addUpdateListener(listenerRecorder2); + + // Result is just a place to record last notification step and satisfy MergedListener's constructor + final QueryTable result = new QueryTable( + RowSetFactory.flat(100).toTracking(), + Map.of("NullString", NullValueColumnSource.getInstance(String.class, null))); + result.setRefreshing(true); + assertEquals(updateGraph.clock().currentStep(), result.getLastNotificationStep()); + assertFalse(result.isFailed()); + + // Re-usable "fake update" that we'll propagate + final TableUpdate fakeUpdate = new TableUpdateImpl(RowSetFactory.flat(100), RowSetFactory.flat(100), + RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY); + + // Merged listener should properly combine the two upstream notifications + final AtomicLong processedStep = new AtomicLong(); + final MergedListener mergedListener = new MergedListener( + List.of(listenerRecorder1, listenerRecorder2), List.of(), "MERGED", result) { + @Override + protected void process() { + processedStep.set(getUpdateGraph().clock().currentStep()); + result.notifyListeners(fakeUpdate.acquire()); + } + }; + result.addParentReference(mergedListener); + listenerRecorder1.setMergedListener(mergedListener); + listenerRecorder2.setMergedListener(mergedListener); + + updateGraph.runWithinUnitTestCycle(() -> parent1.notifyListeners(fakeUpdate.acquire())); + assertEquals(updateGraph.clock().currentStep(), processedStep.get()); + assertEquals(updateGraph.clock().currentStep(), result.getLastNotificationStep()); + assertFalse(result.isFailed()); + + try (final SafeCloseable ignoredCompleter = updateGraph::completeCycleForUnitTests) { + final long previousStep = updateGraph.clock().currentStep(); + updateGraph.startCycleForUnitTests(false); + final long currentStep = updateGraph.clock().currentStep(); + + // We start out unsatisfied + assertFalse(listenerRecorder1.satisfied(currentStep)); + assertFalse(listenerRecorder2.satisfied(currentStep)); + assertFalse(mergedListener.satisfied(currentStep)); + assertFalse(result.satisfied(currentStep)); + + // Adding an update notification changes nothing + parent1.notifyListeners(fakeUpdate.acquire()); + assertFalse(listenerRecorder1.satisfied(currentStep)); + assertFalse(mergedListener.satisfied(currentStep)); + assertFalse(result.satisfied(currentStep)); + + // Adding an error notification changes nothing + parent2.notifyListenersOnError(new RuntimeException("FAKE ERROR"), null); + assertFalse(listenerRecorder2.satisfied(currentStep)); + assertFalse(mergedListener.satisfied(currentStep)); + assertFalse(result.satisfied(currentStep)); + + // Since we have notifications enqueued, marking the update graph satisfied changes nothing + updateGraph.markSourcesRefreshedForUnitTests(); + assertFalse(listenerRecorder1.satisfied(currentStep)); + assertFalse(listenerRecorder2.satisfied(currentStep)); + assertFalse(mergedListener.satisfied(currentStep)); + assertFalse(result.satisfied(currentStep)); + + // Flushing the first parent's table update notification satisfies just its listener recorder + updateGraph.flushOneNotificationForUnitTests(); + assertTrue(listenerRecorder1.satisfied(currentStep)); + assertFalse(listenerRecorder2.satisfied(currentStep)); + assertFalse(mergedListener.satisfied(currentStep)); + assertFalse(result.satisfied(currentStep)); + + // Flushing the second parent's table update notification satisfies just its listener recorder + try (final SafeCloseable ignoredErrorExpectation = new ErrorExpectation()) { + updateGraph.flushOneNotificationForUnitTests(); + } + assertTrue(listenerRecorder1.satisfied(currentStep)); + assertTrue(listenerRecorder2.satisfied(currentStep)); + assertFalse(mergedListener.satisfied(currentStep)); + assertFalse(result.satisfied(currentStep)); + + // Make sure our recorded steps are the expected values: nothing has been delivered to the result yet + assertEquals(previousStep, processedStep.get()); + assertEquals(previousStep, result.getLastNotificationStep()); + assertFalse(result.isFailed()); + + // Flushing the merged listener's notification satisfies everything + try (final SafeCloseable ignoredErrorExpectation = new ErrorExpectation()) { + updateGraph.flushOneNotificationForUnitTests(); + } + assertTrue(listenerRecorder1.satisfied(currentStep)); + assertTrue(listenerRecorder2.satisfied(currentStep)); + assertTrue(mergedListener.satisfied(currentStep)); + assertTrue(result.satisfied(currentStep)); + + // Make sure our recorded steps are the expected values: we delivered a failure notification to the result, + // but the merged listener never processed a table update + assertEquals(previousStep, processedStep.get()); + assertEquals(currentStep, result.getLastNotificationStep()); + assertTrue(result.isFailed()); + } + } + public void testMultipleUpdateGraphs() { final QueryTable r1, s1, r2, s2; final UpdateGraph g1 = new DummyUpdateGraph("one"); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java index 6ae5717c690..d5afee88bf2 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java @@ -166,7 +166,7 @@ protected static void simulateShiftAwareStep(final GenerateTableUpdates.Simulati // System.gc(); } - public class ErrorExpectation implements Closeable { + public class ErrorExpectation implements SafeCloseable { final boolean originalExpectError; public ErrorExpectation() {