Skip to content

Commit

Permalink
Replace Scheduler usage in PeriodicUpdateGraph with ScheduledExecutor…
Browse files Browse the repository at this point in the history
…Service. Standardize on nanoseconds for timing. Eliminate unnecessary initializations.
  • Loading branch information
rcaudy committed Oct 31, 2023
1 parent 0b59b8d commit 1d21a08
Showing 1 changed file with 54 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import io.deephaven.io.log.LogEntry;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.io.logger.Logger;
import io.deephaven.io.sched.Scheduler;
import io.deephaven.io.sched.TimedJob;
import io.deephaven.net.CommBase;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.util.datastructures.SimpleReferenceManager;
Expand All @@ -51,6 +48,8 @@
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* <p>
* This class uses a thread (or pool of threads) to periodically update a set of monitored update sources at a specified
Expand Down Expand Up @@ -132,25 +131,30 @@ public static PerformanceEntry createUpdatePerformanceEntry(
private final Thread refreshThread;
private volatile boolean running = true;

/**
* {@link ScheduledExecutorService} used for scheduling the {@link #watchDogTimeoutProcedure}.
*/
private final ScheduledExecutorService watchdogScheduler;

/**
* If this is set to a positive value, then we will call the {@link #watchDogTimeoutProcedure} if any single run
* loop takes longer than this value. The intention is to use this for strategies, or other queries, where a
* PeriodicUpdateGraph loop that is "stuck" is the equivalent of an error. Set the value with
* {@link #setWatchDogMillis(int)}.
*/
private int watchDogMillis = 0;
private volatile int watchDogMillis = 0;
/**
* If a timeout time has been {@link #setWatchDogMillis(int) set}, this procedure will be called if any single run
* loop takes longer than the value specified. Set the value with
* {@link #setWatchDogTimeoutProcedure(LongConsumer)}.
*/
private LongConsumer watchDogTimeoutProcedure = null;
private volatile LongConsumer watchDogTimeoutProcedure;

public static final String ALLOW_UNIT_TEST_MODE_PROP = "PeriodicUpdateGraph.allowUnitTestMode";
private final boolean allowUnitTestMode;
private int notificationAdditionDelay = 0;
private int notificationAdditionDelay;
private Random notificationRandomizer = new Random(0);
private boolean unitTestMode = false;
private boolean unitTestMode;
private ExecutorService unitTestRefreshThreadPool;

public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP =
Expand All @@ -162,27 +166,27 @@ public static PerformanceEntry createUpdatePerformanceEntry(
private final long minimumCycleDurationToLogNanos;

/** when to next flush the performance tracker; initializes to zero to force a flush on start */
private long nextUpdatePerformanceTrackerFlushTime = 0;
private long nextUpdatePerformanceTrackerFlushTimeNanos;

/**
* How many cycles we have not logged, but were non-zero.
*/
private long suppressedCycles = 0;
private long suppressedCyclesTotalNanos = 0;
private long suppressedCyclesTotalSafePointTimeMillis = 0;
private long suppressedCycles;
private long suppressedCyclesTotalNanos;
private long suppressedCyclesTotalSafePointTimeMillis;

/**
* Accumulated UpdateGraph exclusive lock waits for the current cycle (or previous, if idle).
*/
private long currentCycleLockWaitTotalNanos = 0;
private long currentCycleLockWaitTotalNanos;
/**
* Accumulated delays due to intracycle yields for the current cycle (or previous, if idle).
*/
private long currentCycleYieldTotalNanos = 0L;
private long currentCycleYieldTotalNanos;
/**
* Accumulated delays due to intracycle sleeps for the current cycle (or previous, if idle).
*/
private long currentCycleSleepTotalNanos = 0L;
private long currentCycleSleepTotalNanos;

public static class AccumulatedCycleStats {
/**
Expand Down Expand Up @@ -331,6 +335,14 @@ public PeriodicUpdateGraph(
}
}), "PeriodicUpdateGraph." + name + ".refreshThread");
refreshThread.setDaemon(true);
watchdogScheduler = Executors.newSingleThreadScheduledExecutor(
new NamingThreadFactory(PeriodicUpdateGraph.class, "watchdogScheduler", true) {
@Override
public Thread newThread(@NotNull final Runnable r) {
// Not a refresh thread, but should still be instrumented for debugging purposes.
return super.newThread(ThreadInitializationFactory.wrapRunnable(r));
}
});

updatePerformanceTracker = new UpdatePerformanceTracker(this);
}
Expand Down Expand Up @@ -593,15 +605,6 @@ public void setWatchDogTimeoutProcedure(LongConsumer procedure) {
this.watchDogTimeoutProcedure = procedure;
}

private class WatchdogJob extends TimedJob {
@Override
public void timedOut() {
if (watchDogTimeoutProcedure != null) {
watchDogTimeoutProcedure.accept(watchDogMillis);
}
}
}

/**
* Install a real NotificationProcessor and start the primary refresh thread.
*
Expand Down Expand Up @@ -1092,7 +1095,7 @@ public Runnable flushAllNormalNotificationsForUnitTests(@NotNull final BooleanSu
final ControlledNotificationProcessor controlledNotificationProcessor = new ControlledNotificationProcessor();
notificationProcessor = controlledNotificationProcessor;
final Future<?> flushJobFuture = unitTestRefreshThreadPool.submit(() -> {
final long deadlineNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
final long deadlineNanoTime = System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis);
boolean flushed;
while ((flushed = flushOneNotificationForUnitTestsInternal()) || !done.getAsBoolean()) {
if (!flushed) {
Expand Down Expand Up @@ -1639,8 +1642,6 @@ private static LogEntry appendAsMillisFromNanos(final LogEntry entry, final long
* {@link #getTargetCycleDurationMillis() minimum cycle time}.
*/
private void refreshTablesAndFlushNotifications() {
final Scheduler sched = CommBase.getScheduler();
final long startTime = sched.currentTimeMillis();
final long startTimeNanos = System.nanoTime();
jvmIntrospectionContext.startSample();

Expand All @@ -1649,17 +1650,20 @@ private void refreshTablesAndFlushNotifications() {
} else {
currentCycleLockWaitTotalNanos = currentCycleYieldTotalNanos = currentCycleSleepTotalNanos = 0L;

WatchdogJob watchdogJob = null;
ScheduledFuture<?> watchdogFuture = null;

final long localWatchdogMillis = watchDogMillis;
final LongConsumer localWatchdogTimeoutProcedure = watchDogTimeoutProcedure;
if ((watchDogMillis > 0) && (watchDogTimeoutProcedure != null)) {
watchdogJob = new WatchdogJob();
sched.installJob(watchdogJob, startTime + watchDogMillis);
watchdogFuture = watchdogScheduler.schedule(
() -> localWatchdogTimeoutProcedure.accept(localWatchdogMillis),
localWatchdogMillis, MILLISECONDS);
}

refreshAllTables();

if (watchdogJob != null) {
sched.cancelJob(watchdogJob);
if (watchdogFuture != null) {
watchdogFuture.cancel(true);
}
jvmIntrospectionContext.endSample();
final long cycleTimeNanos = System.nanoTime() - startTimeNanos;
Expand All @@ -1670,7 +1674,7 @@ private void refreshTablesAndFlushNotifications() {
Thread.yield();
}

waitForNextCycle(startTime, sched);
waitForNextCycle(startTimeNanos);
}

private void computeStatsAndLogCycle(final long cycleTimeNanos) {
Expand Down Expand Up @@ -1754,24 +1758,25 @@ private void logSuppressedCycles() {
* wait the remaining period.
* </p>
*
* @param startTime The start time of the last run cycle
* @param timeSource The source of time that startTime was based on
* @param startTimeNanos The start time of the last run cycle as reported by {@link System#nanoTime()}
*/
private void waitForNextCycle(final long startTime, final Scheduler timeSource) {
final long now = timeSource.currentTimeMillis();
long expectedEndTime = startTime + targetCycleDurationMillis;
private void waitForNextCycle(final long startTimeNanos) {
final long nowNanos = System.nanoTime();
long expectedEndTimeNanos = startTimeNanos + MILLISECONDS.toNanos(targetCycleDurationMillis);
if (minimumInterCycleSleep > 0) {
expectedEndTime = Math.max(expectedEndTime, now + minimumInterCycleSleep);
expectedEndTimeNanos =
Math.max(expectedEndTimeNanos, nowNanos + MILLISECONDS.toNanos(minimumInterCycleSleep));
}
if (expectedEndTime >= nextUpdatePerformanceTrackerFlushTime) {
nextUpdatePerformanceTrackerFlushTime = now + UpdatePerformanceTracker.REPORT_INTERVAL_MILLIS;
if (expectedEndTimeNanos >= nextUpdatePerformanceTrackerFlushTimeNanos) {
nextUpdatePerformanceTrackerFlushTimeNanos =
nowNanos + MILLISECONDS.toNanos(UpdatePerformanceTracker.REPORT_INTERVAL_MILLIS);
try {
updatePerformanceTracker.flush();
} catch (Exception err) {
log.error().append("Error flushing UpdatePerformanceTracker: ").append(err).endl();
}
}
waitForEndTime(expectedEndTime, timeSource);
waitForEndTime(expectedEndTimeNanos);
}

/**
Expand All @@ -1782,21 +1787,22 @@ private void waitForNextCycle(final long startTime, final Scheduler timeSource)
* If the delay is interrupted for any other {@link InterruptedException reason}, it will be logged and continue to
* wait the remaining period.
*
* @param expectedEndTime The time which we should sleep until
* @param timeSource The source of time that startTime was based on
* @param expectedEndTimeNanos The time (as reported by {@link System#nanoTime()}) which we should sleep until
*/
private void waitForEndTime(final long expectedEndTime, final Scheduler timeSource) {
long remainingMillis;
while ((remainingMillis = expectedEndTime - timeSource.currentTimeMillis()) > 0) {
private void waitForEndTime(final long expectedEndTimeNanos) {
long remainingNanos;
while ((remainingNanos = expectedEndTimeNanos - System.nanoTime()) > 0) {
if (refreshRequested.get()) {
return;
}
synchronized (refreshRequested) {
if (refreshRequested.get()) {
return;
}
final long millisToWait = remainingNanos / 1_000_000;
final int extraNanosToWait = (int) (remainingNanos - (millisToWait * 1_000_000));
try {
refreshRequested.wait(remainingMillis);
refreshRequested.wait(millisToWait, extraNanosToWait);
} catch (final InterruptedException logAndIgnore) {
log.warn().append("Interrupted while waiting on refreshRequested. Ignoring: ").append(logAndIgnore)
.endl();
Expand Down Expand Up @@ -1994,7 +2000,7 @@ public static final class Builder {
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
private long targetCycleDurationMillis =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
private long minimumCycleDurationToLogNanos = TimeUnit.MILLISECONDS.toNanos(
private long minimumCycleDurationToLogNanos = MILLISECONDS.toNanos(
Configuration.getInstance().getIntegerWithDefault(MINIMUM_CYCLE_DURATION_TO_LOG_MILLIS_PROP, 25));

private String name;
Expand Down

0 comments on commit 1d21a08

Please sign in to comment.