Skip to content

Commit

Permalink
Introduce Blink-To-Append-Only Memoization (#4880)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan Caudy <rcaudy@gmail.com>
  • Loading branch information
nbauernfeind and rcaudy authored Nov 24, 2023
1 parent 96d126b commit 8cfe8a8
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 141 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public static MemoizedOperationKey rollup(Collection<? extends Aggregation> aggr
includeConstituents);
}

public static MemoizedOperationKey blinkToAppendOnly(final long sizeLimit, @NotNull final Object key) {
return new BlinkToAppendOnly(sizeLimit, key);
}

private static boolean isMemoizable(SelectColumn[] selectColumn) {
return Arrays.stream(selectColumn)
.allMatch(sc -> sc instanceof SourceColumn || sc instanceof ReinterpretedColumn);
Expand Down Expand Up @@ -540,7 +544,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}

public static WouldMatch wouldMatch(WouldMatchPair... pairs) {
public static MemoizedOperationKey wouldMatch(WouldMatchPair... pairs) {
return new WouldMatch(pairs);
}

Expand Down Expand Up @@ -592,7 +596,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}

public static CrossJoin crossJoin(final Table rightTable, final MatchPair[] columnsToMatch,
public static MemoizedOperationKey crossJoin(final Table rightTable, final MatchPair[] columnsToMatch,
final MatchPair[] columnsToAdd, final int numRightBitsToReserve) {
return new CrossJoin(rightTable, columnsToMatch, columnsToAdd, numRightBitsToReserve);
}
Expand Down Expand Up @@ -650,7 +654,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}

public static RangeJoin rangeJoin(
public static MemoizedOperationKey rangeJoin(
@NotNull final Table rightTable,
@NotNull final Collection<? extends JoinMatch> exactMatches,
@NotNull final RangeJoinMatch rangeMatch,
Expand All @@ -672,4 +676,38 @@ protected static boolean equalWeakRefsByReferentIdentity(final WeakReference<?>
}
return t1 == t2;
}

private static class BlinkToAppendOnly extends AttributeAgnosticMemoizedOperationKey {
private final long sizeLimit;
private final Object key;

private BlinkToAppendOnly(final long sizeLimit, @NotNull final Object key) {
this.sizeLimit = sizeLimit;
this.key = Objects.requireNonNull(key);
}

@Override
public boolean equals(final Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}

final BlinkToAppendOnly blinkToAppendOnly = (BlinkToAppendOnly) other;

return sizeLimit == blinkToAppendOnly.sizeLimit && key.equals(blinkToAppendOnly.key);
}

@Override
public int hashCode() {
return 31 * key.hashCode() + Long.hashCode(sizeLimit);
}

@Override
BaseTable.CopyAttributeOperation copyType() {
return BaseTable.CopyAttributeOperation.None;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ public synchronized boolean snapshotCompletedConsistently(
final boolean usedPreviousValues) {
final boolean snapshotConsistent;
if (isInInitialNotificationWindow()) {
if (eventualListener == null) {
throw new IllegalStateException("Listener has not been set on end!");
}
if (eventualResult == null) {
throw new IllegalStateException("Result has not been set on end!");
}
Expand All @@ -132,7 +129,7 @@ public synchronized boolean snapshotCompletedConsistently(

// Be sure to record initial last notification step before subscribing
eventualResult.setLastNotificationStep(lastNotificationStep);
return subscribeForUpdates(eventualListener);
return eventualListener == null || subscribeForUpdates(eventualListener);
}

/**
Expand Down Expand Up @@ -160,7 +157,7 @@ boolean subscribeForUpdates(@NotNull final TableUpdateListener listener) {
* @param resultTable The table that will result from this operation
*/
public synchronized void setListenerAndResult(
@NotNull final TableUpdateListener listener,
final TableUpdateListener listener,
@NotNull final NotificationStepReceiver resultTable) {
eventualListener = listener;
eventualResult = resultTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public synchronized boolean snapshotCompletedConsistently(long afterClockValue,
}

@Override
public synchronized void setListenerAndResult(@NotNull final TableUpdateListener listener,
public synchronized void setListenerAndResult(final TableUpdateListener listener,
@NotNull final NotificationStepReceiver resultTable) {
super.setListenerAndResult(listener, resultTable);
if (DEBUG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,19 @@ default boolean snapshotNeeded() {
*/
class Result<T extends DynamicNode & NotificationStepReceiver> {
public final T resultNode;
public final TableUpdateListener resultListener; // may be null if parent is non-ticking
/**
* The listener that should be attached to the parent. The listener may be null if the table does not need
* to respond to ticks from other sources (e.g. the parent is non-refreshing).
*/
public final TableUpdateListener resultListener;

public Result(@NotNull final T resultNode) {
this(resultNode, null);
}

/**
* Construct the result of an operation. The listener may be null if the parent is non-ticking and the table
* does not need to respond to ticks from other sources.
* Construct the result of an operation. The listener may be null if the table does not need to respond to
* ticks from other sources (e.g. the parent is non-refreshing).
*
* @param resultNode the result of the operation
* @param resultListener the listener that should be attached to the parent (or null)
Expand Down Expand Up @@ -3537,8 +3541,7 @@ private <T extends DynamicNode & NotificationStepReceiver> T getResultNoMemo(fin

resultTable.setValue(result.resultNode);
if (snapshotControl != null) {
snapshotControl.setListenerAndResult(Require.neqNull(result.resultListener, "resultListener"),
result.resultNode);
snapshotControl.setListenerAndResult(result.resultListener, result.resultNode);
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class EngineMetrics {
private static final Logger log = LoggerFactory.getLogger(EngineMetrics.class);
private static final boolean STATS_LOGGING_ENABLED = Configuration.getInstance().getBooleanWithDefault(
"statsLoggingEnabled", true);

private static volatile ProcessInfo PROCESS_INFO;
private static volatile EngineMetrics ENGINE_METRICS;

Expand Down Expand Up @@ -105,7 +106,9 @@ public QueryTable getProcessInfoQueryTable() {
}

public QueryTable getProcessMetricsQueryTable() {
return statsImpl == null ? null : (QueryTable) BlinkTableTools.blinkToAppendOnly(statsImpl.blinkTable());
return statsImpl == null
? null
: (QueryTable) BlinkTableTools.blinkToAppendOnly(statsImpl.blinkTable());
}

private StatsIntradayLogger getStatsLogger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,13 +934,28 @@ public void markSourcesRefreshedForUnitTests() {
*/
@TestUseOnly
public void completeCycleForUnitTests() {
completeCycleForUnitTests(false);
}

/**
* Do the second half of the update cycle, including flushing notifications, and completing the
* {@link LogicalClockImpl#completeUpdateCycle() LogicalClock} update cycle. Note that this happens on a simulated
* UpdateGraph run thread, rather than this thread.
*
* @param errorCaughtAndInFinallyBlock Whether an error was caught, and we are in a {@code finally} block
*/
private void completeCycleForUnitTests(boolean errorCaughtAndInFinallyBlock) {
Assert.assertion(unitTestMode, "unitTestMode");
Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(),
"logicalClock.currentStep()");
if (!errorCaughtAndInFinallyBlock) {
Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(),
"logicalClock.currentStep()");
}
try {
unitTestRefreshThreadPool.submit(this::completeCycleForUnitTestsInternal).get();
} catch (InterruptedException | ExecutionException e) {
throw new UncheckedDeephavenException(e);
if (!errorCaughtAndInFinallyBlock) {
throw new UncheckedDeephavenException(e);
}
}
}

Expand Down Expand Up @@ -986,10 +1001,14 @@ public <T extends Exception> void runWithinUnitTestCycle(
final boolean sourcesSatisfied)
throws T {
startCycleForUnitTests(sourcesSatisfied);
boolean errorCaught = false;
try {
runnable.run();
} catch (final Throwable err) {
errorCaught = true;
throw err;
} finally {
completeCycleForUnitTests();
completeCycleForUnitTests(errorCaught);
}
}

Expand Down
Loading

0 comments on commit 8cfe8a8

Please sign in to comment.