Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support refreshing Iceberg tables #5707

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
470b09c
Initial commit of refreshing Iceberg.
lbooker42 Jul 2, 2024
a8d957a
Rebased to main.
lbooker42 Jul 2, 2024
264fdb1
Change IcebergInstructions refreshing indicator to enum instead of bo…
lbooker42 Jul 2, 2024
58d0a73
WIP, for review
lbooker42 Jul 3, 2024
e090474
Manual and auto-refreshing working, better documentation.
lbooker42 Jul 23, 2024
57021ad
Addressed more PR comments, some remaining.
lbooker42 Jul 23, 2024
fb882e8
WIP, some PR comments addressed.
lbooker42 Jul 26, 2024
5bbdeb2
WIP, even more PR comments addressed.
lbooker42 Jul 27, 2024
3da205c
Nearly all PR comments addressed.
lbooker42 Jul 27, 2024
91acf9b
merged with main
lbooker42 Jul 27, 2024
08dd329
Adjustment to IcebergInstructions update mode.
lbooker42 Jul 29, 2024
7af0d1d
Added python wrapper for Iceberg refreshing tables.
lbooker42 Jul 29, 2024
2d79c38
Changes to mocked tests for ColumnSourceManager and PartitionAwareSou…
lbooker42 Jul 29, 2024
3809f21
Added DHError handler and add'l documentation to python `snapshots()`…
lbooker42 Jul 30, 2024
5273a15
Fixed typo in JavaDoc
lbooker42 Jul 30, 2024
b9e2c6e
WIP
lbooker42 Jul 31, 2024
9937f79
Suggestion from review
lbooker42 Jul 31, 2024
cd08038
WIP, changes to revert some transaction token code.
lbooker42 Jul 31, 2024
f28325f
Correct logic across multiple transactions.
lbooker42 Aug 21, 2024
2d92b3f
Merged with main
lbooker42 Aug 21, 2024
cd31d82
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 23, 2024
d680c0c
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 26, 2024
6607fc3
PR comments addressed.
lbooker42 Aug 28, 2024
893336f
Updated to use IcebergTableAdapter and exposed in python. Addressed P…
lbooker42 Aug 30, 2024
68e4546
Incorporated external PR to update PartitioningColumnDataIndex for re…
lbooker42 Aug 30, 2024
273f5c1
Added additional snapshots with removes to IcebergToolsTest resources.
lbooker42 Sep 3, 2024
1e92a19
Merge branch 'main' into lab-iceberg-refreshing
lbooker42 Sep 3, 2024
f72c1b7
Manual and auto refreshing tests for Iceberg.
lbooker42 Sep 4, 2024
5c7ff12
Manual and auto refreshing tests for Iceberg, not passing.
lbooker42 Sep 4, 2024
92eec61
PR comments addressed.
lbooker42 Sep 4, 2024
95194b7
Implemented improved location reference counting in AbstractTableLoca…
lbooker42 Sep 5, 2024
09e2b6e
Fixing doc problem.
lbooker42 Sep 5, 2024
30910dd
For review only, does not compile :(
lbooker42 Sep 12, 2024
dd12240
Compiles now, still many problems
lbooker42 Sep 13, 2024
944dac6
Working through problems.
lbooker42 Sep 13, 2024
912b9f2
Cleanup and minor changes
lbooker42 Sep 13, 2024
e2b6fd0
Refactored ATLP
lbooker42 Sep 18, 2024
72b03be
Merged with main.
lbooker42 Sep 18, 2024
01e50fe
Updated but RCSM still not referenced properly.
lbooker42 Sep 18, 2024
81e88d8
Refreshing tests still need work.
lbooker42 Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* Partial {@link TableLocationProvider} implementation for standalone use or as part of a {@link TableDataService}.
* <p>
* Presents an interface similar to {@link TableLocationProvider.Listener} for subclasses to use when communicating with
* the parent; see {@link #handleTableLocationKeyAdded(TableLocationKey, Object).
* the parent; see {@link #handleTableLocationKeyAdded(TableLocationKey, Object)}.
* <p>
* Note that subclasses are responsible for determining when it's appropriate to call {@link #setInitialized()} and/or
* override {@link #doInitialization()}.
Expand All @@ -34,7 +34,7 @@ private static class Transaction {
Set<ImmutableTableLocationKey> locationsAdded = EMPTY_TABLE_LOCATION_KEYS;
Set<ImmutableTableLocationKey> locationsRemoved = EMPTY_TABLE_LOCATION_KEYS;

void addlocationKey(ImmutableTableLocationKey locationKey) {
synchronized void addLocationKey(ImmutableTableLocationKey locationKey) {
if (locationsAdded == EMPTY_TABLE_LOCATION_KEYS) {
locationsAdded = new HashSet<>();
}
Expand All @@ -49,7 +49,7 @@ void addlocationKey(ImmutableTableLocationKey locationKey) {
}
}

void removeLocationKey(ImmutableTableLocationKey locationKey) {
synchronized void removeLocationKey(ImmutableTableLocationKey locationKey) {
if (locationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
locationsRemoved = new HashSet<>();
}
Expand All @@ -72,7 +72,7 @@ void removeLocationKey(ImmutableTableLocationKey locationKey) {
private final ImmutableTableKey tableKey;

// Open transactions that are being accumulated
private final Map<Object, Transaction> transactions = new HashMap<>();
private final Map<Object, Transaction> transactions = Collections.synchronizedMap(new HashMap<>());

/**
* Map from {@link TableLocationKey} to itself, or to a {@link TableLocation}. The values are {@link TableLocation}s
Expand Down Expand Up @@ -140,15 +140,13 @@ protected final void deliverInitialSnapshot(@NotNull final TableLocationProvider
* @param token A token to identify the transaction
*/
protected void beginTransaction(@NotNull final Object token) {
synchronized (transactions) {
// Verify that we can start a new transaction with this token.
transactions.compute(token, (key, val) -> {
if (val != null) {
throw new IllegalStateException("A transaction with token " + token + " is already open.");
}
return new Transaction();
});
}
// Verify that we can start a new transaction with this token.
transactions.compute(token, (key, val) -> {
if (val != null) {
throw new IllegalStateException("A transaction with token " + token + " is already open.");
}
return new Transaction();
});
}

/**
Expand All @@ -157,40 +155,42 @@ protected void beginTransaction(@NotNull final Object token) {
* @param token A token to identify the transaction
*/
protected void endTransaction(@NotNull final Object token) {
final Transaction transaction;
synchronized (transactions) {
// Verify that this transaction is open.
transaction = transactions.remove(token);
if (transaction == null) {
throw new IllegalStateException("No transaction with token " + token + " is currently open.");
}
// Verify that this transaction is open.
final Transaction transaction = transactions.remove(token);
if (transaction == null) {
throw new IllegalStateException("No transaction with token " + token + " is currently open.");
}

final Collection<ImmutableTableLocationKey> addedKeys =
new ArrayList<>(transaction.locationsAdded.size());
final Collection<ImmutableTableLocationKey> removedKeys =
new ArrayList<>(transaction.locationsRemoved.size());

// Return early if there are no changes to process.
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
if (transaction.locationsAdded == EMPTY_TABLE_LOCATION_KEYS
&& transaction.locationsRemoved == EMPTY_TABLE_LOCATION_KEYS) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

for (ImmutableTableLocationKey locationKey : transaction.locationsRemoved) {
final Object removedLocation = tableLocations.remove(locationKey);
if (removedLocation != null) {
removedKeys.add(locationKey);
final Collection<ImmutableTableLocationKey> addedKeys =
new ArrayList<>(transaction.locationsAdded.size());
final Collection<ImmutableTableLocationKey> removedKeys =
new ArrayList<>(transaction.locationsRemoved.size());

synchronized (tableLocations) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
for (ImmutableTableLocationKey locationKey : transaction.locationsRemoved) {
final Object removedLocation = tableLocations.remove(locationKey);
if (removedLocation != null) {
if (removedLocation instanceof AbstractTableLocation) {
((AbstractTableLocation) removedLocation).decrementReferenceCount();
}
removedKeys.add(locationKey);
}
}
}

for (ImmutableTableLocationKey locationKey : transaction.locationsAdded) {
locationCreatedRecorder = false;
tableLocations.putIfAbsent(locationKey, this::observeInsert);
visitLocationKey(locationKey);
if (locationCreatedRecorder) {
verifyPartitionKeys(locationKey);
addedKeys.add(locationKey);
for (ImmutableTableLocationKey locationKey : transaction.locationsAdded) {
locationCreatedRecorder = false;
tableLocations.putIfAbsent(locationKey, this::observeInsert);
visitLocationKey(locationKey);
if (locationCreatedRecorder) {
verifyPartitionKeys(locationKey);
addedKeys.add(locationKey);
}
}
}

Expand Down Expand Up @@ -230,16 +230,13 @@ protected final void handleTableLocationKeyAdded(
@Nullable final Object transactionToken) {

if (transactionToken != null) {
final Transaction transaction;
synchronized (transactions) {
transaction = transactions.get(transactionToken);
if (transaction == null) {
throw new IllegalStateException(
"No transaction with token " + transactionToken + " is currently open.");
}
final Transaction transaction = transactions.get(transactionToken);
if (transaction == null) {
throw new IllegalStateException(
"No transaction with token " + transactionToken + " is currently open.");
}
// Store an immutable key
transaction.addlocationKey(toKeyImmutable(locationKey));
transaction.addLocationKey(toKeyImmutable(locationKey));
return;
}

Expand Down Expand Up @@ -335,9 +332,11 @@ protected void doInitialization() {
@Override
@NotNull
public final Collection<ImmutableTableLocationKey> getTableLocationKeys() {
final List<ImmutableTableLocationKey> result = new ArrayList<>(tableLocations.size());
tableLocations.keySet().forEach(key -> result.add((ImmutableTableLocationKey) key));
return result;
synchronized (tableLocations) {
// Note that we never store keys that aren't immutable.
// noinspection unchecked,rawtypes
return new ArrayList<>((Set<ImmutableTableLocationKey>) (Set) tableLocations.keySet());
}
}

@Override
Expand All @@ -363,6 +362,9 @@ public TableLocation getTableLocationIfPresent(@NotNull final TableLocationKey t
if (immutableKey == current) {
// Note, this may contend for the lock on tableLocations
tableLocations.add(current = makeTableLocation(immutableKey));
if (current instanceof AbstractTableLocation) {
((AbstractTableLocation) current).incrementReferenceCount();
}
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -418,27 +420,30 @@ protected void handleTableLocationKeyRemoved(
@NotNull final TableLocationKey locationKey,
@Nullable final Object transactionToken) {
if (transactionToken != null) {
final Transaction transaction;
synchronized (transactions) {
transaction = transactions.get(transactionToken);
if (transaction == null) {
throw new IllegalStateException(
"No transaction with token " + transactionToken + " is currently open.");
}
final Transaction transaction = transactions.get(transactionToken);
if (transaction == null) {
throw new IllegalStateException(
"No transaction with token " + transactionToken + " is currently open.");
}
transaction.removeLocationKey(toKeyImmutable(locationKey));
return;
}

if (!supportsSubscriptions()) {
tableLocations.remove(locationKey);
final Object removedLocation = tableLocations.remove(locationKey);
if (removedLocation instanceof AbstractTableLocation) {
((AbstractTableLocation) removedLocation).decrementReferenceCount();
}
return;
}

// If we're not in a transaction, we should push this key immediately.
synchronized (subscriptions) {
final Object removedLocation = tableLocations.remove(locationKey);
if (removedLocation != null) {
if (removedLocation instanceof AbstractTableLocation) {
((AbstractTableLocation) removedLocation).decrementReferenceCount();
}
if (subscriptions.deliverNotification(
Listener::handleTableLocationKeyRemoved,
locationKey.makeImmutable(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ public class RegionedColumnSourceManager extends LivenessArtifact implements Col
@ReferentialIntegrity
private final Collection<DataIndex> retainedDataIndexes = new ArrayList<>();

private final List<AbstractTableLocation> locationsToClear;
private final UpdateCommitter<?> locationClearCommitter;

/**
* A reference to a delayed error notifier for the {@link #includedLocationsTable}, if one is pending.
*/
Expand Down Expand Up @@ -199,19 +196,6 @@ public class RegionedColumnSourceManager extends LivenessArtifact implements Col
}
}


locationsToClear = new ArrayList<>();
locationClearCommitter = new UpdateCommitter<>(this,
ExecutionContext.getContext().getUpdateGraph(),
(ignored) -> {
locationsToClear.forEach(location -> {
location.handleUpdate(null, System.currentTimeMillis());
location.clearColumnLocations();

});
locationsToClear.clear();
});

invalidateCommitter = new UpdateCommitter<>(this,
ExecutionContext.getContext().getUpdateGraph(),
(ignored) -> {
Expand All @@ -222,6 +206,16 @@ public class RegionedColumnSourceManager extends LivenessArtifact implements Col
});
}

@Override
public void destroy() {
super.destroy();
includedTableLocations.keySet().forEach(location -> {
if (location instanceof AbstractTableLocation) {
((AbstractTableLocation) location).decrementReferenceCount();
}
});
}

@Override
public synchronized void addLocation(@NotNull final TableLocation tableLocation) {
final IncludedTableLocationEntry includedLocation = includedTableLocations.get(tableLocation.getKey());
Expand All @@ -232,9 +226,6 @@ public synchronized void addLocation(@NotNull final TableLocation tableLocation)
log.debug().append("LOCATION_ADDED:").append(tableLocation.toString()).endl();
}
emptyTableLocations.add(new EmptyTableLocationEntry(tableLocation));
if (tableLocation instanceof AbstractTableLocation) {
((AbstractTableLocation) tableLocation).incrementReferenceCount();
}
} else {
// Duplicate location - not allowed
final TableLocation duplicateLocation =
Expand Down Expand Up @@ -420,6 +411,9 @@ private TableUpdateImpl update(final boolean initializing) {
includedTableLocations.add(entry);
orderedIncludedTableLocations.add(entry);
entry.processInitial(addedRowSetBuilder, entryToInclude.initialRowSet);
if (entry.location instanceof AbstractTableLocation) {
((AbstractTableLocation) entry.location).incrementReferenceCount();
}
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

// We have a new location, add the row set to the table and mark the row as added.
// @formatter:off
Expand Down