Skip to content

Commit

Permalink
Moved transaction accumulation to AbstractTableLocationProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
lbooker42 committed Aug 26, 2024
1 parent cd31d82 commit d680c0c
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ interface Listener extends BasicTableDataListener {
* </p>
*
* @param tableLocationKey The new table location key.
* @param transactionToken The token identifying the transaction.
* @param transactionToken The token identifying the transaction, or {@code null} if this addition is not part
* of a transaction.
*/
void handleTableLocationKeyAdded(
@NotNull ImmutableTableLocationKey tableLocationKey,
Expand All @@ -60,7 +61,8 @@ void handleTableLocationKeyAdded(
/**
* Notify the listener of a {@link TableLocationKey} encountered while initiating or maintaining the location
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
* guaranteed. Uses {@code this} as the token.
* guaranteed. This addition is not part of any transaction, and is equivalent to
* {@code handleTableLocationKeyAdded(tableLocationKey, null);} by default.
*
* @param tableLocationKey The new table location key.
*/
Expand All @@ -77,14 +79,16 @@ default void handleTableLocationKeyAdded(@NotNull ImmutableTableLocationKey tabl
* </p>
*
* @param tableLocationKey The table location key that was removed.
* @param transactionToken The token identifying the transaction.
* @param transactionToken The token identifying the transaction, or {@code null} if this addition is not part
* of a transaction.
*/
void handleTableLocationKeyRemoved(
@NotNull ImmutableTableLocationKey tableLocationKey,
@Nullable Object transactionToken);

/**
* Notify the listener of a {@link TableLocationKey} that has been removed. Uses {@code this} as the token.
* Notify the listener of a {@link TableLocationKey} that has been removed. This addition is not part of any
* transaction, and is equivalent to {@code handleTableLocationKeyAdded(tableLocationKey, null);} by default.
*
* @param tableLocationKey The table location key that was removed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
package io.deephaven.engine.table.impl.locations.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.locations.*;
Expand All @@ -30,12 +29,41 @@ public abstract class AbstractTableLocationProvider

private static final Set<TableLocationKey> EMPTY_TABLE_LOCATION_KEYS = Collections.emptySet();

/**
* Helper class to manage a transaction of added and removed location keys.
*/
private static class Transaction {
Set<TableLocationKey> locationsAdded = EMPTY_TABLE_LOCATION_KEYS;
Set<TableLocationKey> locationsRemoved = EMPTY_TABLE_LOCATION_KEYS;

void addlocationKey(TableLocationKey locationKey) {
if (locationsAdded == EMPTY_TABLE_LOCATION_KEYS) {
locationsAdded = new HashSet<>();
} else if (locationsAdded.contains(locationKey)) {
throw new IllegalStateException("TableLocationKey " + locationKey
+ " was already added in this transaction.");
}
locationsAdded.add(locationKey);
}

void removeLocationKey(TableLocationKey locationKey) {
if (locationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
locationsRemoved = new HashSet<>();
} else if (locationsRemoved.contains(locationKey)) {
throw new IllegalStateException("TableLocationKey " + locationKey
+ " was already removed and has not been replaced.");
} else if (locationsAdded.contains(locationKey)) {
throw new IllegalStateException("TableLocationKey " + locationKey
+ " was removed after being added in the same transaction.");
}
locationsRemoved.add(locationKey);
}
}

private final ImmutableTableKey tableKey;

// These sets represent open transactions that are being accumulated.
private final Set<Object> transactionTokens = new HashSet<>();
private final Map<Object, Set<TableLocationKey>> accumulatedLocationsAdded = new HashMap<>();
private final Map<Object, Set<TableLocationKey>> accumulatedLocationsRemoved = new HashMap<>();
private final Map<Object, Transaction> transactions = new HashMap<>();

private final Object transactionLock = new Object();

Expand Down Expand Up @@ -114,10 +142,6 @@ protected final void deliverInitialSnapshot(@NotNull final TableLocationProvider
listener.handleTableLocationKeysUpdate(unmodifiableTableLocationKeys, null);
}

protected final void handleTableLocationKeyAdded(@NotNull final TableLocationKey locationKey) {
handleTableLocationKeyAdded(locationKey, null);
}

/**
* Internal method to begin an atomic transaction of location adds and removes.
*
Expand All @@ -126,17 +150,10 @@ protected final void handleTableLocationKeyAdded(@NotNull final TableLocationKey
protected void beginTransaction(@NotNull final Object token) {
synchronized (transactionLock) {
// Verify that we can start a new transaction with this token.
if (transactionTokens.contains(token)) {
if (transactions.containsKey(token)) {
throw new IllegalStateException("A transaction with token " + token + " is currently open.");
}
Assert.eqFalse(accumulatedLocationsAdded.containsKey(token),
"accumulatedLocationsAdded.containsKey(token)");
Assert.eqFalse(accumulatedLocationsRemoved.containsKey(token),
"accumulatedLocationsRemoved.containsKey(token)");

transactionTokens.add(token);
accumulatedLocationsAdded.put(token, EMPTY_TABLE_LOCATION_KEYS);
accumulatedLocationsRemoved.put(token, EMPTY_TABLE_LOCATION_KEYS);
transactions.put(token, new Transaction());
}
}

Expand All @@ -146,26 +163,26 @@ protected void beginTransaction(@NotNull final Object token) {
* @param token A token to identify the transaction
*/
protected void endTransaction(@NotNull final Object token) {
final Set<TableLocationKey> locationsAdded;
final Set<TableLocationKey> locationsRemoved;
final Transaction transaction;
synchronized (transactionLock) {
// Verify that this transaction is open.
if (!transactionTokens.remove(token)) {
transaction = transactions.remove(token);
if (transaction == null) {
throw new IllegalStateException("No transaction with token " + token + " is currently open.");
}

locationsAdded = accumulatedLocationsAdded.remove(token);
locationsRemoved = accumulatedLocationsRemoved.remove(token);
}

final Collection<ImmutableTableLocationKey> addedImmutableKeys = new ArrayList<>(locationsAdded.size());
final Collection<ImmutableTableLocationKey> removedImmutableKeys = new ArrayList<>(locationsRemoved.size());
final Collection<ImmutableTableLocationKey> addedImmutableKeys =
new ArrayList<>(transaction.locationsAdded.size());
final Collection<ImmutableTableLocationKey> removedImmutableKeys =
new ArrayList<>(transaction.locationsRemoved.size());

// Process the accumulated adds and removes under a lock on `tableLocations` to keep modifications atomic to
// other holders of this lock.
synchronized (tableLocations) {
if (locationsAdded != EMPTY_TABLE_LOCATION_KEYS || locationsRemoved != EMPTY_TABLE_LOCATION_KEYS) {
for (TableLocationKey locationKey : locationsAdded) {
if (transaction.locationsAdded != EMPTY_TABLE_LOCATION_KEYS
|| transaction.locationsRemoved != EMPTY_TABLE_LOCATION_KEYS) {
for (TableLocationKey locationKey : transaction.locationsAdded) {
locationCreatedRecorder = false;
final Object result = tableLocations.putIfAbsent(locationKey, this::observeInsert);
visitLocationKey(locationKey);
Expand All @@ -175,7 +192,7 @@ protected void endTransaction(@NotNull final Object token) {
}
}

for (TableLocationKey locationKey : locationsRemoved) {
for (TableLocationKey locationKey : transaction.locationsRemoved) {
final Object removedLocation = tableLocations.remove(locationKey);
if (removedLocation != null) {
maybeClearLocationForRemoval(removedLocation);
Expand All @@ -190,10 +207,10 @@ protected void endTransaction(@NotNull final Object token) {
// Push the notifications to the subscribers.
if ((!addedImmutableKeys.isEmpty() || !removedImmutableKeys.isEmpty())
&& subscriptions.deliverNotification(
Listener::handleTableLocationKeysUpdate,
addedImmutableKeys,
removedImmutableKeys,
true)) {
Listener::handleTableLocationKeysUpdate,
addedImmutableKeys,
removedImmutableKeys,
true)) {
onEmpty();
}
}
Expand All @@ -204,7 +221,17 @@ protected void endTransaction(@NotNull final Object token) {
* Deliver a possibly-new key.
*
* @param locationKey The new key
* @param transactionToken The token identifying the transaction
* @apiNote This method is intended to be used by subclasses or by tightly-coupled discovery tools.
*/
protected final void handleTableLocationKeyAdded(@NotNull final TableLocationKey locationKey) {
handleTableLocationKeyAdded(locationKey, null);
}

/**
* Deliver a possibly-new key, optionally as part of a transaction.
*
* @param locationKey The new key
* @param transactionToken The token identifying the transaction (or null if not part of a transaction)
* @apiNote This method is intended to be used by subclasses or by tightly-coupled discovery tools.
*/
protected final void handleTableLocationKeyAdded(
Expand All @@ -213,7 +240,7 @@ protected final void handleTableLocationKeyAdded(

if (!supportsSubscriptions()) {
tableLocations.putIfAbsent(locationKey, TableLocationKey::makeImmutable);
visitLocationKey(toKeyImmutable(locationKey));
visitLocationKey(locationKey);
return;
}

Expand All @@ -224,16 +251,12 @@ protected final void handleTableLocationKeyAdded(
// 2. If the location was already removed in this transaction, we have a `replace` operation which is not a
// logical error (although it may not be supported by all consumers).
synchronized (transactionLock) {
if (accumulatedLocationsAdded.get(transactionToken) == EMPTY_TABLE_LOCATION_KEYS) {
accumulatedLocationsAdded.put(transactionToken, new HashSet<>());
}
final Set<TableLocationKey> locationsAdded = accumulatedLocationsAdded.get(transactionToken);

if (accumulatedLocationsAdded.containsKey(locationKey)) {
throw new IllegalStateException("TableLocationKey " + locationKey
+ " was added multiple times in the same transaction.");
final Transaction transaction = transactions.get(transactionToken);
if (transaction == null) {
throw new IllegalStateException(
"No transaction with token " + transactionToken + " is currently open.");
}
locationsAdded.add(locationKey);
transaction.addlocationKey(locationKey);
}
return;
}
Expand Down Expand Up @@ -388,6 +411,12 @@ public void removeTableLocationKey(@NotNull final TableLocationKey locationKey)
handleTableLocationKeyRemoved(locationKey, null);
}

/**
* Handle a removal. Notify subscribers that {@code locationKey} was removed if necessary. See
* {@link #removeTableLocationKey(TableLocationKey)} for additional discussions of semantics.
*
* @param locationKey the TableLocation that was removed
*/
protected final void handleTableLocationKeyRemoved(@NotNull final TableLocationKey locationKey) {
handleTableLocationKeyRemoved(locationKey, null);
}
Expand All @@ -397,7 +426,7 @@ protected final void handleTableLocationKeyRemoved(@NotNull final TableLocationK
* necessary. See {@link #removeTableLocationKey(TableLocationKey)} for additional discussions of semantics.
*
* @param locationKey the TableLocation that was removed
* @param transactionToken The token identifying the transaction
* @param transactionToken The token identifying the transaction (or null if not part of a transaction)
*/
protected void handleTableLocationKeyRemoved(
@NotNull final TableLocationKey locationKey,
Expand All @@ -414,19 +443,12 @@ protected void handleTableLocationKeyRemoved(
// add then remove the same location.
if (transactionToken != null) {
synchronized (transactionLock) {
if (accumulatedLocationsRemoved.get(transactionToken) == EMPTY_TABLE_LOCATION_KEYS) {
accumulatedLocationsRemoved.put(transactionToken, new HashSet<>());
}
final Set<TableLocationKey> locationsRemoved = accumulatedLocationsRemoved.get(transactionToken);

if (accumulatedLocationsRemoved.containsKey(locationKey)) {
throw new IllegalStateException("TableLocationKey " + locationKey
+ " was removed multiple times in the same transaction.");
} else if (accumulatedLocationsAdded.containsKey(locationKey)) {
throw new IllegalStateException("TableLocationKey " + locationKey
+ " was removed after being added in the same transaction.");
final Transaction transaction = transactions.get(transactionToken);
if (transaction == null) {
throw new IllegalStateException(
"No transaction with token " + transactionToken + " is currently open.");
}
locationsRemoved.add(locationKey);
transaction.removeLocationKey(locationKey);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ public void subscribe(@NotNull final Listener listener) {
p.subscribe(listener);
} else {
p.refresh();
listener.beginTransaction(p);
p.getTableLocationKeys().forEach(tlk -> listener.handleTableLocationKeyAdded(tlk, p));
listener.endTransaction(p);
p.getTableLocationKeys().forEach(listener::handleTableLocationKeyAdded);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.HashSet;
import java.util.Set;

/**
* Polling-driven {@link TableLocationProvider} implementation that delegates {@link TableLocationKey location key}
* discovery to a {@link TableLocationKeyFinder} and {@link TableLocation location} creation to a
Expand Down Expand Up @@ -46,19 +43,9 @@ public String getImplementationName() {
return IMPLEMENTATION_NAME;
}

// The simplest way to support "push" of new data availability is to provide a callback to the user that just calls
// `refresh`, which would need to become synchronized. Alternatively, we could make an Iceberg-specific aTLP
// implementation that exposes a more specific callback, e.g. with a snapshot ID, as well as the option to disable
// polling. We do need a mechanism to avoid going backwards, probably.
@Override
public void refresh() {
final Set<ImmutableTableLocationKey> missedKeys = new HashSet<>(getTableLocationKeys());
locationKeyFinder.findKeys(tableLocationKey -> {
// noinspection SuspiciousMethodCalls
missedKeys.remove(tableLocationKey);
handleTableLocationKeyAdded(tableLocationKey);
});
missedKeys.forEach(this::handleTableLocationKeyRemoved);
locationKeyFinder.findKeys(this::handleTableLocationKeyAdded);
setInitialized();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void handleTableLocationKeyRemoved(
// Verify that we don't have stacked removes (without intervening adds).
if (pendingLocationsRemoved.contains(tableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + tableLocationKey
+ " was already removed by a previous transaction.");
+ " was already removed and has not been replaced.");
}
if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsRemoved = new HashSet<>();
Expand All @@ -185,7 +185,7 @@ public void handleTableLocationKeysUpdate(
// Verify that we don't have stacked removes.
if (pendingLocationsRemoved.contains(removedTableLocationKey)) {
throw new IllegalStateException("TableLocationKey " + removedTableLocationKey
+ " was already removed by a previous transaction.");
+ " was already removed and has not been replaced.");
}
if (pendingLocationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
pendingLocationsRemoved = new HashSet<>();
Expand Down
Loading

0 comments on commit d680c0c

Please sign in to comment.