diff --git a/Util/src/main/java/io/deephaven/util/reference/CleanupReferenceProcessor.java b/Util/src/main/java/io/deephaven/util/reference/CleanupReferenceProcessor.java index a452e9967e7..c4b613997df 100644 --- a/Util/src/main/java/io/deephaven/util/reference/CleanupReferenceProcessor.java +++ b/Util/src/main/java/io/deephaven/util/reference/CleanupReferenceProcessor.java @@ -6,14 +6,21 @@ import io.deephaven.base.reference.CleanupReference; import io.deephaven.base.verify.Require; import io.deephaven.configuration.Configuration; +import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.util.Utils; import io.deephaven.util.annotations.TestUseOnly; -import io.deephaven.internal.log.LoggerFactory; import org.jetbrains.annotations.NotNull; +import java.lang.ref.PhantomReference; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * Utility for draining a reference queue of {@link CleanupReference}s and invoking their cleanup methods. @@ -57,9 +64,9 @@ public interface ExceptionHandler { private final ExceptionHandler exceptionHandler; /** - * The reference queue from the most recent initialization. + * The drain queue from the most recent initialization. */ - private volatile ReferenceQueue referenceQueue; + private volatile DrainQueue drainQueue; /** * The cleaner thread from the most recent initialization, guarded by the lock on {@code this}. @@ -98,29 +105,113 @@ public CleanupReferenceProcessor( * {@link CleanupReferenceProcessor} instance */ public ReferenceQueue getReferenceQueue() { - ReferenceQueue localQueue; - if ((localQueue = referenceQueue) == null) { + return getDrainQueue().referenceQueue(); + } + + private DrainQueue getDrainQueue() { + DrainQueue localQueue; + if ((localQueue = drainQueue) == null) { synchronized (this) { - if ((localQueue = referenceQueue) == null) { - referenceQueue = localQueue = new ReferenceQueue<>(); - cleanerThread = new Thread(new DrainQueue(localQueue), + if ((localQueue = drainQueue) == null) { + drainQueue = localQueue = new DrainQueue(new ReferenceQueue<>()); + cleanerThread = new Thread(localQueue, "CleanupReferenceProcessor-" + name + "-drainingThread"); cleanerThread.setDaemon(true); cleanerThread.start(); } } } - // noinspection unchecked return localQueue; } + /** + * Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes phantom + * reachable. + * + *

+ * The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the + * {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when + * {@code referent} has become phantom reachable. The {@code action} will not be invoked more than once. + * + *

+ * The cleaning {@code action} must not refer to the {@code referent} being registered. If so, the + * {@code referent} will never become phantom reachable and the cleaning {@code action} will never be invoked + * automatically. + * + *

+ * Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit + * {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference + * processor will hold onto the reference. + * + * @param referent the object to monitor + * @param action a {@code Runnable} to invoke when the referent becomes phantom reachable + * @return a cleanup reference instance + */ + public CleanupReference registerPhantom(T referent, Runnable action) { + return getDrainQueue().registerPhantom(referent, action); + } + + /** + * Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes weakly + * reachable. + * + *

+ * The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the + * {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when + * {@code referent} has become weakly reachable. The {@code action} will not be invoked more than once. + * + *

+ * The cleaning {@code action} must not refer to the {@code referent} being registered. If so, the + * {@code referent} will never become weakly reachable and the cleaning {@code action} will never be invoked + * automatically. + * + *

+ * Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit + * {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference + * processor will hold onto the reference. + * + * @param referent the object to monitor + * @param action a {@code Runnable} to invoke when the referent becomes weakly reachable + * @return a cleanup reference instance + */ + public CleanupReference registerWeak(T referent, Runnable action) { + return getDrainQueue().registerWeak(referent, action); + } + + /** + * Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes softly + * reachable. + * + *

+ * The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the + * {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when + * {@code referent} has become softly reachable. The {@code action} will not be invoked more than once. + * + *

+ * The cleaning {@code action} must not refer to the {@code referent} being registered. If so, the + * {@code referent} will never become softly reachable and the cleaning {@code action} will never be invoked + * automatically. + * + *

+ * Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit + * {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference + * processor will hold onto the reference. + * + * @param referent the object to monitor + * @param action a {@code Runnable} to invoke when the referent becomes softly reachable + * @return a cleanup reference instance + */ + public CleanupReference registerSoft(T referent, Runnable action) { + return getDrainQueue().registerSoft(referent, action); + } + /** * Reset this instance so that the next call to {@link #getReferenceQueue()} will re-initialize it and provide a new * queue. Results in the prompt termination of the daemon thread that may have been draining the existing queue. */ @TestUseOnly public final synchronized void resetForUnitTests() { - referenceQueue = null; + drainQueue = null; if (cleanerThread != null) { cleanerThread.interrupt(); cleanerThread = null; @@ -132,33 +223,139 @@ public final synchronized void resetForUnitTests() { */ private class DrainQueue implements Runnable { - private final ReferenceQueue localQueue; + private final ReferenceQueue referenceQueue; + private final Set> registrations; - private DrainQueue(ReferenceQueue localQueue) { - this.localQueue = localQueue; + private DrainQueue(ReferenceQueue referenceQueue) { + this.referenceQueue = Objects.requireNonNull(referenceQueue); + this.registrations = Collections.newSetFromMap(new ConcurrentHashMap<>()); + } + + public ReferenceQueue referenceQueue() { + // noinspection unchecked + return (ReferenceQueue) referenceQueue; + } + + public CleanupReference registerPhantom(T referent, Runnable action) { + final PhantomCleanupRef ref = new PhantomCleanupRef<>(referent, referenceQueue(), action); + registrations.add(ref); + return ref; + } + + public CleanupReference registerWeak(T referent, Runnable action) { + final WeakCleanupRef ref = new WeakCleanupRef<>(referent, referenceQueue(), action); + registrations.add(ref); + return ref; + } + + public CleanupReference registerSoft(T referent, Runnable action) { + final SoftCleanupRef ref = new SoftCleanupRef<>(referent, referenceQueue(), action); + registrations.add(ref); + return ref; } @Override public void run() { - while (localQueue == referenceQueue) { + while (this == drainQueue) { final Reference reference; try { - reference = localQueue.remove(shutdownCheckDelayMillis); + reference = referenceQueue.remove(shutdownCheckDelayMillis); } catch (InterruptedException ignored) { continue; } if (reference instanceof CleanupReference) { + final CleanupReference ref = (CleanupReference) reference; try { if (LOG_CLEANED_REFERENCES) { log.info().append("CleanupReferenceProcessor-").append(name).append(", cleaning ") .append(Utils.REFERENT_FORMATTER, reference).endl(); } - ((CleanupReference) reference).cleanup(); + ref.cleanup(); } catch (Exception e) { - exceptionHandler.accept(log, (CleanupReference) reference, e); + exceptionHandler.accept(log, ref, e); + } finally { + if (ref instanceof RegisteredCleanupReference) { + registrations.remove(ref); + } } } } } } + + interface RegisteredCleanupReference extends CleanupReference { + + } + + private static class PhantomCleanupRef extends PhantomReference implements RegisteredCleanupReference { + private Runnable action; + + PhantomCleanupRef(T referent, ReferenceQueue q, Runnable action) { + super(referent, q); + this.action = Objects.requireNonNull(action); + Reference.reachabilityFence(referent); + Reference.reachabilityFence(q); + } + + @Override + public void cleanup() { + final Runnable cleanup; + synchronized (this) { + if (action == null) { + return; + } + cleanup = action; + action = null; + } + cleanup.run(); + } + } + + private static class WeakCleanupRef extends WeakReference implements RegisteredCleanupReference { + private Runnable action; + + WeakCleanupRef(T referent, ReferenceQueue q, Runnable action) { + super(referent, q); + this.action = Objects.requireNonNull(action); + Reference.reachabilityFence(referent); + Reference.reachabilityFence(q); + } + + @Override + public void cleanup() { + final Runnable cleanup; + synchronized (this) { + if (action == null) { + return; + } + cleanup = action; + action = null; + } + cleanup.run(); + } + } + + private static class SoftCleanupRef extends SoftReference implements RegisteredCleanupReference { + private Runnable action; + + SoftCleanupRef(T referent, ReferenceQueue q, Runnable action) { + super(referent, q); + this.action = Objects.requireNonNull(action); + Reference.reachabilityFence(referent); + Reference.reachabilityFence(q); + } + + @Override + public void cleanup() { + final Runnable cleanup; + synchronized (this) { + if (action == null) { + return; + } + cleanup = action; + action = null; + } + cleanup.run(); + } + } } diff --git a/Util/src/test/java/io/deephaven/util/reference/CleanupReferenceProcessorTest.java b/Util/src/test/java/io/deephaven/util/reference/CleanupReferenceProcessorTest.java new file mode 100644 index 00000000000..71711ea4bde --- /dev/null +++ b/Util/src/test/java/io/deephaven/util/reference/CleanupReferenceProcessorTest.java @@ -0,0 +1,48 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.util.reference; + +import org.junit.Ignore; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; + +public class CleanupReferenceProcessorTest { + + @Test + public void registerPhantom() throws InterruptedException, TimeoutException { + register(CleanupReferenceProcessor.getDefault()::registerPhantom); + } + + @Test + public void registerWeak() throws InterruptedException, TimeoutException { + register(CleanupReferenceProcessor.getDefault()::registerWeak); + } + + @Ignore("Soft references are harder to test, as they are cleared out at the discretion of the garbage collector based on memory pressure") + @Test + public void registerSoft() throws InterruptedException, TimeoutException { + register(CleanupReferenceProcessor.getDefault()::registerSoft); + } + + private static void register(BiFunction bf) throws InterruptedException, TimeoutException { + final CountDownLatch latch = new CountDownLatch(1); + { + final Object obj = new Object(); + bf.apply(obj, latch::countDown); + } + for (int i = 0; i < 20; ++i) { + System.gc(); + if (latch.await(100, TimeUnit.MILLISECONDS)) { + break; + } + } + if (latch.getCount() != 0) { + throw new TimeoutException(); + } + } +}