From 1d21a08b3078532947c722bb5ba365e06bc64a99 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Mon, 30 Oct 2023 19:54:28 -0400 Subject: [PATCH] Replace Scheduler usage in PeriodicUpdateGraph with ScheduledExecutorService. Standardize on nanoseconds for timing. Eliminate unnecessary initializations. --- .../updategraph/impl/PeriodicUpdateGraph.java | 102 +++++++++--------- 1 file changed, 54 insertions(+), 48 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 447243b0f0c..cbae20d7a85 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -27,9 +27,6 @@ import io.deephaven.io.log.LogEntry; import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.io.logger.Logger; -import io.deephaven.io.sched.Scheduler; -import io.deephaven.io.sched.TimedJob; -import io.deephaven.net.CommBase; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.TestUseOnly; import io.deephaven.util.datastructures.SimpleReferenceManager; @@ -51,6 +48,8 @@ import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + /** *

* This class uses a thread (or pool of threads) to periodically update a set of monitored update sources at a specified @@ -132,25 +131,30 @@ public static PerformanceEntry createUpdatePerformanceEntry( private final Thread refreshThread; private volatile boolean running = true; + /** + * {@link ScheduledExecutorService} used for scheduling the {@link #watchDogTimeoutProcedure}. + */ + private final ScheduledExecutorService watchdogScheduler; + /** * If this is set to a positive value, then we will call the {@link #watchDogTimeoutProcedure} if any single run * loop takes longer than this value. The intention is to use this for strategies, or other queries, where a * PeriodicUpdateGraph loop that is "stuck" is the equivalent of an error. Set the value with * {@link #setWatchDogMillis(int)}. */ - private int watchDogMillis = 0; + private volatile int watchDogMillis = 0; /** * If a timeout time has been {@link #setWatchDogMillis(int) set}, this procedure will be called if any single run * loop takes longer than the value specified. Set the value with * {@link #setWatchDogTimeoutProcedure(LongConsumer)}. */ - private LongConsumer watchDogTimeoutProcedure = null; + private volatile LongConsumer watchDogTimeoutProcedure; public static final String ALLOW_UNIT_TEST_MODE_PROP = "PeriodicUpdateGraph.allowUnitTestMode"; private final boolean allowUnitTestMode; - private int notificationAdditionDelay = 0; + private int notificationAdditionDelay; private Random notificationRandomizer = new Random(0); - private boolean unitTestMode = false; + private boolean unitTestMode; private ExecutorService unitTestRefreshThreadPool; public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP = @@ -162,27 +166,27 @@ public static PerformanceEntry createUpdatePerformanceEntry( private final long minimumCycleDurationToLogNanos; /** when to next flush the performance tracker; initializes to zero to force a flush on start */ - private long nextUpdatePerformanceTrackerFlushTime = 0; + private long nextUpdatePerformanceTrackerFlushTimeNanos; /** * How many cycles we have not logged, but were non-zero. */ - private long suppressedCycles = 0; - private long suppressedCyclesTotalNanos = 0; - private long suppressedCyclesTotalSafePointTimeMillis = 0; + private long suppressedCycles; + private long suppressedCyclesTotalNanos; + private long suppressedCyclesTotalSafePointTimeMillis; /** * Accumulated UpdateGraph exclusive lock waits for the current cycle (or previous, if idle). */ - private long currentCycleLockWaitTotalNanos = 0; + private long currentCycleLockWaitTotalNanos; /** * Accumulated delays due to intracycle yields for the current cycle (or previous, if idle). */ - private long currentCycleYieldTotalNanos = 0L; + private long currentCycleYieldTotalNanos; /** * Accumulated delays due to intracycle sleeps for the current cycle (or previous, if idle). */ - private long currentCycleSleepTotalNanos = 0L; + private long currentCycleSleepTotalNanos; public static class AccumulatedCycleStats { /** @@ -331,6 +335,14 @@ public PeriodicUpdateGraph( } }), "PeriodicUpdateGraph." + name + ".refreshThread"); refreshThread.setDaemon(true); + watchdogScheduler = Executors.newSingleThreadScheduledExecutor( + new NamingThreadFactory(PeriodicUpdateGraph.class, "watchdogScheduler", true) { + @Override + public Thread newThread(@NotNull final Runnable r) { + // Not a refresh thread, but should still be instrumented for debugging purposes. + return super.newThread(ThreadInitializationFactory.wrapRunnable(r)); + } + }); updatePerformanceTracker = new UpdatePerformanceTracker(this); } @@ -593,15 +605,6 @@ public void setWatchDogTimeoutProcedure(LongConsumer procedure) { this.watchDogTimeoutProcedure = procedure; } - private class WatchdogJob extends TimedJob { - @Override - public void timedOut() { - if (watchDogTimeoutProcedure != null) { - watchDogTimeoutProcedure.accept(watchDogMillis); - } - } - } - /** * Install a real NotificationProcessor and start the primary refresh thread. * @@ -1092,7 +1095,7 @@ public Runnable flushAllNormalNotificationsForUnitTests(@NotNull final BooleanSu final ControlledNotificationProcessor controlledNotificationProcessor = new ControlledNotificationProcessor(); notificationProcessor = controlledNotificationProcessor; final Future flushJobFuture = unitTestRefreshThreadPool.submit(() -> { - final long deadlineNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis); + final long deadlineNanoTime = System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis); boolean flushed; while ((flushed = flushOneNotificationForUnitTestsInternal()) || !done.getAsBoolean()) { if (!flushed) { @@ -1639,8 +1642,6 @@ private static LogEntry appendAsMillisFromNanos(final LogEntry entry, final long * {@link #getTargetCycleDurationMillis() minimum cycle time}. */ private void refreshTablesAndFlushNotifications() { - final Scheduler sched = CommBase.getScheduler(); - final long startTime = sched.currentTimeMillis(); final long startTimeNanos = System.nanoTime(); jvmIntrospectionContext.startSample(); @@ -1649,17 +1650,20 @@ private void refreshTablesAndFlushNotifications() { } else { currentCycleLockWaitTotalNanos = currentCycleYieldTotalNanos = currentCycleSleepTotalNanos = 0L; - WatchdogJob watchdogJob = null; + ScheduledFuture watchdogFuture = null; + final long localWatchdogMillis = watchDogMillis; + final LongConsumer localWatchdogTimeoutProcedure = watchDogTimeoutProcedure; if ((watchDogMillis > 0) && (watchDogTimeoutProcedure != null)) { - watchdogJob = new WatchdogJob(); - sched.installJob(watchdogJob, startTime + watchDogMillis); + watchdogFuture = watchdogScheduler.schedule( + () -> localWatchdogTimeoutProcedure.accept(localWatchdogMillis), + localWatchdogMillis, MILLISECONDS); } refreshAllTables(); - if (watchdogJob != null) { - sched.cancelJob(watchdogJob); + if (watchdogFuture != null) { + watchdogFuture.cancel(true); } jvmIntrospectionContext.endSample(); final long cycleTimeNanos = System.nanoTime() - startTimeNanos; @@ -1670,7 +1674,7 @@ private void refreshTablesAndFlushNotifications() { Thread.yield(); } - waitForNextCycle(startTime, sched); + waitForNextCycle(startTimeNanos); } private void computeStatsAndLogCycle(final long cycleTimeNanos) { @@ -1754,24 +1758,25 @@ private void logSuppressedCycles() { * wait the remaining period. *

* - * @param startTime The start time of the last run cycle - * @param timeSource The source of time that startTime was based on + * @param startTimeNanos The start time of the last run cycle as reported by {@link System#nanoTime()} */ - private void waitForNextCycle(final long startTime, final Scheduler timeSource) { - final long now = timeSource.currentTimeMillis(); - long expectedEndTime = startTime + targetCycleDurationMillis; + private void waitForNextCycle(final long startTimeNanos) { + final long nowNanos = System.nanoTime(); + long expectedEndTimeNanos = startTimeNanos + MILLISECONDS.toNanos(targetCycleDurationMillis); if (minimumInterCycleSleep > 0) { - expectedEndTime = Math.max(expectedEndTime, now + minimumInterCycleSleep); + expectedEndTimeNanos = + Math.max(expectedEndTimeNanos, nowNanos + MILLISECONDS.toNanos(minimumInterCycleSleep)); } - if (expectedEndTime >= nextUpdatePerformanceTrackerFlushTime) { - nextUpdatePerformanceTrackerFlushTime = now + UpdatePerformanceTracker.REPORT_INTERVAL_MILLIS; + if (expectedEndTimeNanos >= nextUpdatePerformanceTrackerFlushTimeNanos) { + nextUpdatePerformanceTrackerFlushTimeNanos = + nowNanos + MILLISECONDS.toNanos(UpdatePerformanceTracker.REPORT_INTERVAL_MILLIS); try { updatePerformanceTracker.flush(); } catch (Exception err) { log.error().append("Error flushing UpdatePerformanceTracker: ").append(err).endl(); } } - waitForEndTime(expectedEndTime, timeSource); + waitForEndTime(expectedEndTimeNanos); } /** @@ -1782,12 +1787,11 @@ private void waitForNextCycle(final long startTime, final Scheduler timeSource) * If the delay is interrupted for any other {@link InterruptedException reason}, it will be logged and continue to * wait the remaining period. * - * @param expectedEndTime The time which we should sleep until - * @param timeSource The source of time that startTime was based on + * @param expectedEndTimeNanos The time (as reported by {@link System#nanoTime()}) which we should sleep until */ - private void waitForEndTime(final long expectedEndTime, final Scheduler timeSource) { - long remainingMillis; - while ((remainingMillis = expectedEndTime - timeSource.currentTimeMillis()) > 0) { + private void waitForEndTime(final long expectedEndTimeNanos) { + long remainingNanos; + while ((remainingNanos = expectedEndTimeNanos - System.nanoTime()) > 0) { if (refreshRequested.get()) { return; } @@ -1795,8 +1799,10 @@ private void waitForEndTime(final long expectedEndTime, final Scheduler timeSour if (refreshRequested.get()) { return; } + final long millisToWait = remainingNanos / 1_000_000; + final int extraNanosToWait = (int) (remainingNanos - (millisToWait * 1_000_000)); try { - refreshRequested.wait(remainingMillis); + refreshRequested.wait(millisToWait, extraNanosToWait); } catch (final InterruptedException logAndIgnore) { log.warn().append("Interrupted while waiting on refreshRequested. Ignoring: ").append(logAndIgnore) .endl(); @@ -1994,7 +2000,7 @@ public static final class Builder { Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false); private long targetCycleDurationMillis = Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000); - private long minimumCycleDurationToLogNanos = TimeUnit.MILLISECONDS.toNanos( + private long minimumCycleDurationToLogNanos = MILLISECONDS.toNanos( Configuration.getInstance().getIntegerWithDefault(MINIMUM_CYCLE_DURATION_TO_LOG_MILLIS_PROP, 25)); private String name;