Skip to content

Commit

Permalink
Updated but RCSM still not referenced properly.
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Sep 18, 2024
1 parent 72b03be commit 01e50fe
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public abstract class SourceTable<IMPL_TYPE extends SourceTable<IMPL_TYPE>> exte
definition.getColumns() // This is the *re-written* definition passed to the super-class constructor
);
if (isRefreshing) {
// TODO: managing doesn't work here because columnSourceManager is at zero right now.
manage(columnSourceManager);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.liveness.StandaloneLivenessManager;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.hash.KeyedObjectHashMap;
Expand Down Expand Up @@ -134,7 +134,6 @@ protected synchronized void destroy() {
}

private final ImmutableTableKey tableKey;
private final ReferenceCountedLivenessNode livenessNode;

// Open transactions that are being accumulated
private final Map<Object, Transaction> transactions = Collections.synchronizedMap(new HashMap<>());
Expand All @@ -149,6 +148,13 @@ protected synchronized void destroy() {
private final KeyedObjectHashMap<TableLocationKey, TrackedKeySupplier> tableLocationKeyMap =
new KeyedObjectHashMap<>(TableLocationKeyDefinition.INSTANCE);

/**
* TLP are (currently) outside of liveness scopes, but they need to manage liveness referents to prevent them from
* going out of scope. The {@link StandaloneLivenessManager manager} will maintain the references until GC'd or the
* referents are not needed byt the TLP.
*/
private final StandaloneLivenessManager livenessManager;

private volatile boolean initialized;

private List<String> partitionKeys;
Expand All @@ -165,14 +171,7 @@ protected AbstractTableLocationProvider(@NotNull final TableKey tableKey, final
this.tableKey = tableKey.makeImmutable();
this.partitionKeys = null;

livenessNode = new ReferenceCountedLivenessNode(false) {
@Override
protected void destroy() {
AbstractTableLocationProvider.this.destroy();
}
};
// TODO: understand why this seems to be needed
LivenessScopeStack.peek().manage(livenessNode);
livenessManager = new StandaloneLivenessManager(false);
}

/**
Expand Down Expand Up @@ -282,7 +281,7 @@ protected void endTransaction(@NotNull final Object token) {
}
// Release the keys that were removed after we have delivered the notifications and the
// subscribers have had a chance to process them
removedKeys.forEach(livenessNode::unmanage);
removedKeys.forEach(livenessManager::unmanage);
}
}
}
Expand Down Expand Up @@ -368,7 +367,7 @@ protected void handleTableLocationKeyRemoved(
if (!supportsSubscriptions()) {
final TrackedKeySupplier trackedKey = tableLocationKeyMap.get(locationKey);
trackedKey.deactivate();
livenessNode.unmanage(trackedKey);
livenessManager.unmanage(trackedKey);
return;
}

Expand All @@ -383,7 +382,7 @@ protected void handleTableLocationKeyRemoved(
true)) {
onEmpty();
}
livenessNode.unmanage(trackedKey);
livenessManager.unmanage(trackedKey);
}
}
}
Expand All @@ -404,7 +403,7 @@ private TrackedKeySupplier observeInsert(@NotNull final TableLocationKey locatio
locationCreatedRecorder = true;

final TrackedKeySupplier trackedKey = toTrackedKey(locationKey);
livenessNode.manage(trackedKey);
livenessManager.manage(trackedKey);

return trackedKey;
}
Expand Down Expand Up @@ -571,9 +570,4 @@ private void releaseLocationKey(@NotNull final TrackedKeySupplier locationKey) {
// We can now remove the key from the tableLocations map
tableLocationKeyMap.removeKey(locationKey.get());
}

private void destroy() {
// TODO: release all the TTLK references. Or does that happen automatically?
throw new UnsupportedOperationException("Not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ public class RegionedColumnSourceManager extends ReferenceCountedLivenessNode im
@NotNull final ColumnToCodecMappings codecMappings,
@NotNull final List<ColumnDefinition<?>> columnDefinitions) {
super(false);
// TODO: this seems to be required since the transition to ReferenceCountedLivenessNode (from LivenessArtifact)
// if (Liveness.REFERENCE_TRACKING_DISABLED) {
// return;
// }
LivenessScopeStack.peek().manage(this);

this.isRefreshing = isRefreshing;
this.columnDefinitions = columnDefinitions;
Expand Down Expand Up @@ -197,6 +192,7 @@ public class RegionedColumnSourceManager extends ReferenceCountedLivenessNode im
};
if (isRefreshing) {
rowSetModifiedColumnSet = includedLocationsTable.newModifiedColumnSet(ROWS_SET_COLUMN_NAME);
// TODO: managing doesn't work here because we are at zero right now.
manage(includedLocationsTable);
} else {
rowSetModifiedColumnSet = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.referencecounting.ReferenceCounted;
import org.jetbrains.annotations.NotNull;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Interface for objects that retainReference {@link LivenessReferent}s until such time as they are no longer necessary.
Expand Down Expand Up @@ -36,4 +38,53 @@ default void manage(@NotNull final LivenessReferent referent) {
* @return Whether the referent was in fact added
*/
boolean tryManage(@NotNull LivenessReferent referent);

/**
* If this manager manages {@code referent} one or more times, drop one such reference. If this manager is also a
* {@link LivenessReferent}, then it must also be live.
*
* @param referent The referent to drop
*/
@FinalDefault
default void unmanage(@NotNull LivenessReferent referent) {
if (!tryUnmanage(referent)) {
throw new LivenessStateException(this + " is no longer live and cannot unmanage " +
referent.getReferentDescription());
}
}

/**
* If this manager manages referent one or more times, drop one such reference. If this manager is also a
* {@link LivenessReferent}, then this method is a no-op if {@code this} is not live.
*
* @param referent The referent to drop
* @return Whether this node was live and thus in fact tried to drop a reference
*/
boolean tryUnmanage(@NotNull LivenessReferent referent);

/**
* For each referent in {@code referent}, if this manager manages referent one or more times, drop one such
* reference. If this manager is also a {@link LivenessReferent}, then it must also be live.
*
*
* @param referents The referents to drop
*/
@SuppressWarnings("unused")
@FinalDefault
default void unmanage(@NotNull Stream<? extends LivenessReferent> referents) {
if (!tryUnmanage(referents)) {
throw new LivenessStateException(this + " is no longer live and cannot unmanage " +
referents.map(LivenessReferent::getReferentDescription).collect(Collectors.joining()));
}
}

/**
* For each referent in referents, if this manager manages referent one or more times, drop one such reference. If
* this manager is also a {@link LivenessReferent}, then this method is a no-op if {@code this} is not live.
*
* @param referents The referents to drop
* @return Whether this node was live and thus in fact tried to drop a reference
*/
@SuppressWarnings("unused")
boolean tryUnmanage(@NotNull Stream<? extends LivenessReferent> referents);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,8 @@
//
package io.deephaven.engine.liveness;

import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;

import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* A {@link LivenessReferent} that is also a {@link LivenessManager}, transitively enforcing liveness on its referents.
*/
public interface LivenessNode extends LivenessReferent, LivenessManager {

/**
* If this node manages {@code referent} one or more times, drop one such reference. This node must be live.
*
* @param referent The referent to drop
*/
@FinalDefault
default void unmanage(@NotNull LivenessReferent referent) {
if (!tryUnmanage(referent)) {
throw new LivenessStateException(this + " is no longer live and cannot unmanage " +
referent.getReferentDescription());
}
}

/**
* If this node is still live and manages referent one or more times, drop one such reference.
*
* @param referent The referent to drop
* @return Whether this node was live and thus in fact tried to drop a reference
*/
boolean tryUnmanage(@NotNull LivenessReferent referent);

/**
* For each referent in {@code referent}, if this node manages referent one or more times, drop one such reference.
* This node must be live.
*
* @param referents The referents to drop
*/
@SuppressWarnings("unused")
@FinalDefault
default void unmanage(@NotNull Stream<? extends LivenessReferent> referents) {
if (!tryUnmanage(referents)) {
throw new LivenessStateException(this + " is no longer live and cannot unmanage " +
referents.map(LivenessReferent::getReferentDescription).collect(Collectors.joining()));
}
}

/**
* For each referent in referents, if this node is still live and manages referent one or more times, drop one such
* reference.
*
* @param referents The referents to drop
* @return Whether this node was live and thus in fact tried to drop a reference
*/
@SuppressWarnings("unused")
boolean tryUnmanage(@NotNull Stream<? extends LivenessReferent> referents);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.deephaven.util.Utils;
import org.jetbrains.annotations.NotNull;

import java.util.stream.Stream;

/**
* <p>
* A {@link LivenessManager} implementation that will never release its referents.
Expand All @@ -31,4 +33,14 @@ public boolean tryManage(@NotNull LivenessReferent referent) {
}
return true;
}

@Override
public boolean tryUnmanage(@NotNull LivenessReferent referent) {
throw new UnsupportedOperationException("PermanentLivenessManager cannot unmanage referents");
}

@Override
public boolean tryUnmanage(@NotNull Stream<? extends LivenessReferent> referents) {
throw new UnsupportedOperationException("PermanentLivenessManager cannot unmanage referents");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;

/**
* {@link ReleasableLivenessManager} to manage exactly one object, passed at construction time or managed later.
Expand Down Expand Up @@ -61,6 +62,18 @@ public final boolean tryManage(@NotNull LivenessReferent referent) {
return true;
}

@Override
public boolean tryUnmanage(@NotNull LivenessReferent referent) {
throw new UnsupportedOperationException(
"SingletonLivenessManager should call release() instead of tryUnmanage()");
}

@Override
public boolean tryUnmanage(@NotNull Stream<? extends LivenessReferent> referents) {
throw new UnsupportedOperationException(
"SingletonLivenessManager should call release() instead of tryUnmanage()");
}

@Override
public final void release() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.liveness;

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.util.Utils;
import io.deephaven.util.annotations.VisibleForTesting;
import org.jetbrains.annotations.NotNull;

import java.util.stream.Stream;

/**
* {@link LivenessNode} implementation that relies on reference counting to determine its liveness.
*/
public class StandaloneLivenessManager implements LivenessManager, LogOutputAppendable {

final boolean enforceStrongReachability;

transient RetainedReferenceTracker<StandaloneLivenessManager> tracker;

/**
* @param enforceStrongReachability Whether this {@link LivenessManager} should maintain strong references to its
* referents
*/
@SuppressWarnings("WeakerAccess") // Needed in order to deserialize Serializable subclass instances
public StandaloneLivenessManager(final boolean enforceStrongReachability) {
this.enforceStrongReachability = enforceStrongReachability;
initializeTransientFieldsForLiveness();
}

/**
* Package-private for {@link java.io.Serializable} sub-classes to use in <code>readObject</code> <em>only</em>.
* Public to allow unit tests in another package to work around mock issues where the constructor is never invoked.
*/
@VisibleForTesting
public final void initializeTransientFieldsForLiveness() {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return;
}
tracker = new RetainedReferenceTracker<>(this, enforceStrongReachability);
if (Liveness.DEBUG_MODE_ENABLED) {
Liveness.log.info().append("LivenessDebug: Created tracker ").append(Utils.REFERENT_FORMATTER, tracker)
.append(" for ").append(Utils.REFERENT_FORMATTER, this).endl();
}
}

@Override
public final boolean tryManage(@NotNull final LivenessReferent referent) {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return true;
}
if (Liveness.DEBUG_MODE_ENABLED) {
Liveness.log.info().append("LivenessDebug: ").append(this).append(" managing ")
.append(referent.getReferentDescription()).endl();
}
if (!referent.tryRetainReference()) {
return false;
}
tracker.addReference(referent);
return true;
}

@Override
public final boolean tryUnmanage(@NotNull final LivenessReferent referent) {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return true;
}
tracker.dropReference(referent);
return true;
}

@Override
public final boolean tryUnmanage(@NotNull final Stream<? extends LivenessReferent> referents) {
if (Liveness.REFERENCE_TRACKING_DISABLED) {
return true;
}
tracker.dropReferences(referents);
return true;
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput
.append("StandaloneLivenessManager{enforceStrongReachability=")
.append(enforceStrongReachability)
.append("}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,8 @@ public void testAutoRefreshingTable() throws ExecutionException, InterruptedExce
(IcebergTestTable) resourceCatalog.loadTable(tableId));
final DataInstructionsProviderLoader dataInstructionsProvider =
DataInstructionsProviderLoader.create(Map.of());
final IcebergTableAdapter tableAdapter = new IcebergTableAdapter(tableId, icebergTable, dataInstructionsProvider);
final IcebergTableAdapter tableAdapter =
new IcebergTableAdapter(tableId, icebergTable, dataInstructionsProvider);

final IcebergInstructions localInstructions = IcebergInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
Expand Down
Loading

0 comments on commit 01e50fe

Please sign in to comment.