Skip to content

Commit

Permalink
BarrageSubscription/SnapshotImpl's Future's Manage Additional Livenes…
Browse files Browse the repository at this point in the history
…s Until First #get
  • Loading branch information
nbauernfeind committed Dec 13, 2023
1 parent 8614e20 commit b284a47
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.exceptions.RequestCancelledException;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -37,9 +38,7 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
Expand Down Expand Up @@ -360,15 +359,70 @@ public void onError(@NotNull final Throwable t) {
}
}

private static final AtomicIntegerFieldUpdater<SnapshotCompletableFuture> WAS_RELEASED =
AtomicIntegerFieldUpdater.newUpdater(SnapshotCompletableFuture.class, "wasReleased");

/**
* The Completable Future is used to encapsulate the concept that the table is filled with requested data.
* <p>
* We will keep the result table alive until the user calls {@link Future#get get()} on the future. Note that this
* only protects the getters on {@link Future} not the entire {@link CompletionStage} interface.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* the reference count of the result table.
*/
private class SnapshotCompletableFuture extends CompletableFuture<Table> {
volatile int wasReleased;

public SnapshotCompletableFuture() {
resultTable.incrementReferenceCount();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
maybeRelease();
if (super.cancel(mayInterruptIfRunning)) {
BarrageSnapshotImpl.this.cancel("cancelled by user");
return true;
}

return false;
}

@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
final Table result = super.get(timeout, unit);

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
final Table result = super.get();

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

private void maybeRelease() {
if (WAS_RELEASED.compareAndSet(this, 0, 1)) {
resultTable.decrementReferenceCount();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.chunk.ChunkType;
import io.deephaven.engine.exceptions.RequestCancelledException;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.liveness.*;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.Table;
Expand All @@ -26,6 +26,7 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.function.ThrowingSupplier;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Context;
Expand All @@ -42,10 +43,9 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

/**
* This class is an intermediary helper class that uses a {@code DoExchange} to populate a {@link BarrageTable} using
Expand Down Expand Up @@ -456,30 +456,141 @@ private interface FutureAdapter extends Future<Table> {
boolean completeExceptionally(Throwable ex);
}

private static final AtomicIntegerFieldUpdater<CompletableFutureAdapter> CF_WAS_RELEASED =
AtomicIntegerFieldUpdater.newUpdater(CompletableFutureAdapter.class, "wasReleased");

/**
* The Completable Future is used when this thread is not blocking the update graph progression.
* <p>
* We will keep the result table alive until the user calls {@link Future#get get()} on the future. Note that this
* only protects the getters on {@link Future} not the entire {@link CompletionStage} interface.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* the reference count of the result table.
*/
private class CompletableFutureAdapter extends CompletableFuture<Table> implements FutureAdapter {

volatile int wasReleased;

public CompletableFutureAdapter() {
resultTable.incrementReferenceCount();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
maybeRelease();
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
}
return false;
}

@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
final Table result = super.get(timeout, unit);

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
final Table result = super.get();

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

private void maybeRelease() {
if (CF_WAS_RELEASED.compareAndSet(this, 0, 1)) {
resultTable.decrementReferenceCount();
}
}
}

private static final AtomicIntegerFieldUpdater<UpdateGraphAwareFutureAdapter> UG_WAS_RELEASED =
AtomicIntegerFieldUpdater.newUpdater(UpdateGraphAwareFutureAdapter.class, "wasReleased");

/**
* The Update Graph Aware Future is used when waiting directly on this thread would otherwise be blocking update
* graph progression.
* <p>
* We will keep the result table alive until the user calls {@link Future#get get()} on the future.
* <p>
* Subsequent calls to {@link Future#get get()} will only succeed if the result is still alive and will increase the
* the reference count of the result table.
*/
private class UpdateGraphAwareFutureAdapter extends UpdateGraphAwareCompletableFuture<Table>
implements FutureAdapter {

volatile int wasReleased;

public UpdateGraphAwareFutureAdapter(@NotNull final UpdateGraph updateGraph) {
super(updateGraph);
resultTable.incrementReferenceCount();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
maybeRelease();
if (super.cancel(mayInterruptIfRunning)) {
BarrageSubscriptionImpl.this.cancel("cancelled by user");
return true;
}
return false;
}

@Override
public Table get(final long timeout, @NotNull final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
try {
final Table result = super.get(timeout, unit);

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

@Override
public Table get() throws InterruptedException, ExecutionException {
try {
final Table result = super.get();

if (result instanceof LivenessArtifact) {
((LivenessArtifact) result).manageWithCurrentScope();
}

return result;
} finally {
maybeRelease();
}
}

private void maybeRelease() {
if (UG_WAS_RELEASED.compareAndSet(this, 0, 1)) {
resultTable.decrementReferenceCount();
}
}
}
}

0 comments on commit b284a47

Please sign in to comment.