Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add register to CleanupReferenceProcessor #6213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
import io.deephaven.base.reference.CleanupReference;
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.Utils;
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.internal.log.LoggerFactory;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Utility for draining a reference queue of {@link CleanupReference}s and invoking their cleanup methods.
Expand Down Expand Up @@ -57,9 +64,9 @@ public interface ExceptionHandler {
private final ExceptionHandler exceptionHandler;

/**
* The reference queue from the most recent initialization.
* The drain queue from the most recent initialization.
*/
private volatile ReferenceQueue<?> referenceQueue;
private volatile DrainQueue drainQueue;

/**
* The cleaner thread from the most recent initialization, guarded by the lock on {@code this}.
Expand Down Expand Up @@ -98,29 +105,113 @@ public CleanupReferenceProcessor(
* {@link CleanupReferenceProcessor} instance
*/
public <RT> ReferenceQueue<RT> getReferenceQueue() {
ReferenceQueue localQueue;
if ((localQueue = referenceQueue) == null) {
return getDrainQueue().referenceQueue();
}

private DrainQueue getDrainQueue() {
DrainQueue localQueue;
if ((localQueue = drainQueue) == null) {
synchronized (this) {
if ((localQueue = referenceQueue) == null) {
referenceQueue = localQueue = new ReferenceQueue<>();
cleanerThread = new Thread(new DrainQueue(localQueue),
if ((localQueue = drainQueue) == null) {
drainQueue = localQueue = new DrainQueue(new ReferenceQueue<>());
cleanerThread = new Thread(localQueue,
"CleanupReferenceProcessor-" + name + "-drainingThread");
cleanerThread.setDaemon(true);
cleanerThread.start();
}
}
}
// noinspection unchecked
return localQueue;
}

/**
* Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes phantom
* reachable.
*
* <p>
* The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the
* {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when
* {@code referent} has become phantom reachable. The {@code action} will not be invoked more than once.
*
* <p>
* The cleaning {@code action} must <b>not</b> refer to the {@code referent} being registered. If so, the
* {@code referent} will never become phantom reachable and the cleaning {@code action} will never be invoked
* automatically.
*
* <p>
* Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit
* {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference
* processor will hold onto the reference.
*
* @param referent the object to monitor
* @param action a {@code Runnable} to invoke when the referent becomes phantom reachable
* @return a cleanup reference instance
*/
public <T> CleanupReference<T> registerPhantom(T referent, Runnable action) {
return getDrainQueue().registerPhantom(referent, action);
}

/**
* Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes weakly
* reachable.
*
* <p>
* The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the
* {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when
* {@code referent} has become weakly reachable. The {@code action} will not be invoked more than once.
*
* <p>
* The cleaning {@code action} must <b>not</b> refer to the {@code referent} being registered. If so, the
* {@code referent} will never become weakly reachable and the cleaning {@code action} will never be invoked
* automatically.
*
* <p>
* Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit
* {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference
* processor will hold onto the reference.
*
* @param referent the object to monitor
* @param action a {@code Runnable} to invoke when the referent becomes weakly reachable
* @return a cleanup reference instance
*/
public <T> CleanupReference<T> registerWeak(T referent, Runnable action) {
return getDrainQueue().registerWeak(referent, action);
}

/**
* Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes softly
* reachable.
*
* <p>
* The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the
* {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when
* {@code referent} has become softly reachable. The {@code action} will not be invoked more than once.
*
* <p>
* The cleaning {@code action} must <b>not</b> refer to the {@code referent} being registered. If so, the
* {@code referent} will never become softly reachable and the cleaning {@code action} will never be invoked
* automatically.
*
* <p>
* Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit
* {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference
* processor will hold onto the reference.
*
* @param referent the object to monitor
* @param action a {@code Runnable} to invoke when the referent becomes softly reachable
* @return a cleanup reference instance
*/
public <T> CleanupReference<T> registerSoft(T referent, Runnable action) {
return getDrainQueue().registerSoft(referent, action);
}

/**
* Reset this instance so that the next call to {@link #getReferenceQueue()} will re-initialize it and provide a new
* queue. Results in the prompt termination of the daemon thread that may have been draining the existing queue.
*/
@TestUseOnly
public final synchronized void resetForUnitTests() {
referenceQueue = null;
drainQueue = null;
if (cleanerThread != null) {
cleanerThread.interrupt();
cleanerThread = null;
Expand All @@ -132,33 +223,139 @@ public final synchronized void resetForUnitTests() {
*/
private class DrainQueue implements Runnable {

private final ReferenceQueue<?> localQueue;
private final ReferenceQueue<?> referenceQueue;
private final Set<RegisteredCleanupReference<?>> registrations;

private DrainQueue(ReferenceQueue<?> localQueue) {
this.localQueue = localQueue;
private DrainQueue(ReferenceQueue<?> referenceQueue) {
this.referenceQueue = Objects.requireNonNull(referenceQueue);
this.registrations = Collections.newSetFromMap(new ConcurrentHashMap<>());
}

public <T> ReferenceQueue<T> referenceQueue() {
// noinspection unchecked
return (ReferenceQueue<T>) referenceQueue;
}

public <T> CleanupReference<T> registerPhantom(T referent, Runnable action) {
final PhantomCleanupRef<T> ref = new PhantomCleanupRef<>(referent, referenceQueue(), action);
registrations.add(ref);
return ref;
}

public <T> CleanupReference<T> registerWeak(T referent, Runnable action) {
final WeakCleanupRef<T> ref = new WeakCleanupRef<>(referent, referenceQueue(), action);
registrations.add(ref);
return ref;
}

public <T> CleanupReference<T> registerSoft(T referent, Runnable action) {
final SoftCleanupRef<T> ref = new SoftCleanupRef<>(referent, referenceQueue(), action);
registrations.add(ref);
return ref;
}

@Override
public void run() {
while (localQueue == referenceQueue) {
while (this == drainQueue) {
final Reference<?> reference;
try {
reference = localQueue.remove(shutdownCheckDelayMillis);
reference = referenceQueue.remove(shutdownCheckDelayMillis);
} catch (InterruptedException ignored) {
continue;
}
if (reference instanceof CleanupReference) {
final CleanupReference<?> ref = (CleanupReference<?>) reference;
try {
if (LOG_CLEANED_REFERENCES) {
log.info().append("CleanupReferenceProcessor-").append(name).append(", cleaning ")
.append(Utils.REFERENT_FORMATTER, reference).endl();
}
((CleanupReference<?>) reference).cleanup();
ref.cleanup();
} catch (Exception e) {
exceptionHandler.accept(log, (CleanupReference<?>) reference, e);
exceptionHandler.accept(log, ref, e);
} finally {
if (ref instanceof RegisteredCleanupReference) {
registrations.remove(ref);
}
}
}
}
}
}

interface RegisteredCleanupReference<T> extends CleanupReference<T> {

}

private static class PhantomCleanupRef<T> extends PhantomReference<T> implements RegisteredCleanupReference<T> {
private Runnable action;

PhantomCleanupRef(T referent, ReferenceQueue<? super T> q, Runnable action) {
super(referent, q);
this.action = Objects.requireNonNull(action);
Reference.reachabilityFence(referent);
Reference.reachabilityFence(q);
}

@Override
public void cleanup() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use interface defaulting to avoid this duplication....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there was more complex logic, I would consider it. In this case, I think per-impl is pretty straightforward.

final Runnable cleanup;
synchronized (this) {
if (action == null) {
return;
}
cleanup = action;
action = null;
}
cleanup.run();
}
}

private static class WeakCleanupRef<T> extends WeakReference<T> implements RegisteredCleanupReference<T> {
private Runnable action;

WeakCleanupRef(T referent, ReferenceQueue<? super T> q, Runnable action) {
super(referent, q);
this.action = Objects.requireNonNull(action);
Reference.reachabilityFence(referent);
Reference.reachabilityFence(q);
}

@Override
public void cleanup() {
final Runnable cleanup;
synchronized (this) {
if (action == null) {
return;
}
cleanup = action;
action = null;
}
cleanup.run();
}
}

private static class SoftCleanupRef<T> extends SoftReference<T> implements RegisteredCleanupReference<T> {
private Runnable action;

SoftCleanupRef(T referent, ReferenceQueue<? super T> q, Runnable action) {
super(referent, q);
this.action = Objects.requireNonNull(action);
Reference.reachabilityFence(referent);
Reference.reachabilityFence(q);
}

@Override
public void cleanup() {
final Runnable cleanup;
synchronized (this) {
if (action == null) {
return;
}
cleanup = action;
action = null;
}
cleanup.run();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.util.reference;

import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;

public class CleanupReferenceProcessorTest {

@Test
public void registerPhantom() throws InterruptedException, TimeoutException {
register(CleanupReferenceProcessor.getDefault()::registerPhantom);
}

@Test
public void registerWeak() throws InterruptedException, TimeoutException {
register(CleanupReferenceProcessor.getDefault()::registerWeak);
}

@Ignore("Soft references are harder to test, as they are cleared out at the discretion of the garbage collector based on memory pressure")
@Test
public void registerSoft() throws InterruptedException, TimeoutException {
register(CleanupReferenceProcessor.getDefault()::registerSoft);
}

private static void register(BiFunction<Object, Runnable, ?> bf) throws InterruptedException, TimeoutException {
final CountDownLatch latch = new CountDownLatch(1);
{
final Object obj = new Object();
bf.apply(obj, latch::countDown);
}
for (int i = 0; i < 20; ++i) {
System.gc();
if (latch.await(100, TimeUnit.MILLISECONDS)) {
break;
}
}
if (latch.getCount() != 0) {
throw new TimeoutException();
}
}
}