Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support refreshing Iceberg tables #5707

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
470b09c
Initial commit of refreshing Iceberg.
lbooker42 Jul 2, 2024
a8d957a
Rebased to main.
lbooker42 Jul 2, 2024
264fdb1
Change IcebergInstructions refreshing indicator to enum instead of bo…
lbooker42 Jul 2, 2024
58d0a73
WIP, for review
lbooker42 Jul 3, 2024
e090474
Manual and auto-refreshing working, better documentation.
lbooker42 Jul 23, 2024
57021ad
Addressed more PR comments, some remaining.
lbooker42 Jul 23, 2024
fb882e8
WIP, some PR comments addressed.
lbooker42 Jul 26, 2024
5bbdeb2
WIP, even more PR comments addressed.
lbooker42 Jul 27, 2024
3da205c
Nearly all PR comments addressed.
lbooker42 Jul 27, 2024
91acf9b
merged with main
lbooker42 Jul 27, 2024
08dd329
Adjustment to IcebergInstructions update mode.
lbooker42 Jul 29, 2024
7af0d1d
Added python wrapper for Iceberg refreshing tables.
lbooker42 Jul 29, 2024
2d79c38
Changes to mocked tests for ColumnSourceManager and PartitionAwareSou…
lbooker42 Jul 29, 2024
3809f21
Added DHError handler and add'l documentation to python `snapshots()`…
lbooker42 Jul 30, 2024
5273a15
Fixed typo in JavaDoc
lbooker42 Jul 30, 2024
b9e2c6e
WIP
lbooker42 Jul 31, 2024
9937f79
Suggestion from review
lbooker42 Jul 31, 2024
cd08038
WIP, changes to revert some transaction token code.
lbooker42 Jul 31, 2024
f28325f
Correct logic across multiple transactions.
lbooker42 Aug 21, 2024
2d92b3f
Merged with main
lbooker42 Aug 21, 2024
cd31d82
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 23, 2024
d680c0c
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 26, 2024
6607fc3
PR comments addressed.
lbooker42 Aug 28, 2024
893336f
Updated to use IcebergTableAdapter and exposed in python. Addressed P…
lbooker42 Aug 30, 2024
68e4546
Incorporated external PR to update PartitioningColumnDataIndex for re…
lbooker42 Aug 30, 2024
273f5c1
Added additional snapshots with removes to IcebergToolsTest resources.
lbooker42 Sep 3, 2024
1e92a19
Merge branch 'main' into lab-iceberg-refreshing
lbooker42 Sep 3, 2024
f72c1b7
Manual and auto refreshing tests for Iceberg.
lbooker42 Sep 4, 2024
5c7ff12
Manual and auto refreshing tests for Iceberg, not passing.
lbooker42 Sep 4, 2024
92eec61
PR comments addressed.
lbooker42 Sep 4, 2024
95194b7
Implemented improved location reference counting in AbstractTableLoca…
lbooker42 Sep 5, 2024
09e2b6e
Fixing doc problem.
lbooker42 Sep 5, 2024
30910dd
For review only, does not compile :(
lbooker42 Sep 12, 2024
dd12240
Compiles now, still many problems
lbooker42 Sep 13, 2024
944dac6
Working through problems.
lbooker42 Sep 13, 2024
912b9f2
Cleanup and minor changes
lbooker42 Sep 13, 2024
e2b6fd0
Refactored ATLP
lbooker42 Sep 18, 2024
72b03be
Merged with main.
lbooker42 Sep 18, 2024
01e50fe
Updated but RCSM still not referenced properly.
lbooker42 Sep 18, 2024
81e88d8
Refreshing tests still need work.
lbooker42 Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.DataIndex;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -53,7 +50,7 @@ public interface ColumnSourceManager extends LivenessReferent {
*
* @return The set of added row keys, to be owned by the caller
*/
WritableRowSet refresh();
TableUpdate refresh();

/**
* Advise this ColumnSourceManager that an error has occurred, and that it will no longer be {@link #refresh()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@
import io.deephaven.base.verify.Require;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.TableUpdateListener;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocationProvider;
import io.deephaven.engine.table.impl.locations.TableLocationRemovedException;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.locations.impl.TableLocationSubscriptionBuffer;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.TestUseOnly;
import org.apache.commons.lang3.mutable.Mutable;
Expand Down Expand Up @@ -106,7 +103,6 @@ public abstract class SourceTable<IMPL_TYPE extends SourceTable<IMPL_TYPE>> exte
}

setRefreshing(isRefreshing);
setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, Boolean.TRUE);
}

/**
Expand Down Expand Up @@ -219,10 +215,6 @@ protected void instrumentedRefresh() {
final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate = locationBuffer.processPending();
final ImmutableTableLocationKey[] removedKeys =
maybeRemoveLocations(locationUpdate.getPendingRemovedLocationKeys());
if (removedKeys.length > 0) {
throw new TableLocationRemovedException("Source table does not support removed locations",
removedKeys);
}
maybeAddLocations(locationUpdate.getPendingAddedLocationKeys());

// This class previously had functionality to notify "location listeners", but it was never used.
Expand All @@ -232,13 +224,16 @@ protected void instrumentedRefresh() {
return;
}

final RowSet added = columnSourceManager.refresh();
if (added.isEmpty()) {
final TableUpdate update = columnSourceManager.refresh();
if (update.empty()) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
update.release();
return;
}

rowSet.insert(added);
notifyListeners(added, RowSetFactory.empty(), RowSetFactory.empty());
Assert.eqTrue(update.shifted().empty(), "update.shifted().empty()");
rowSet.remove(update.removed());
rowSet.insert(update.added());
notifyListeners(update);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,41 @@ public interface TableLocationProvider extends NamedImplementation {
*/
interface Listener extends BasicTableDataListener {

/**
* Begin a transaction that collects location key additions and removals to be processed atomically.
*
* @param token A token to identify the transaction.
*/
void beginTransaction(final Object token);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

/**
* End the transaction and process the location changes.
*
* @param token A token to identify the transaction.
*/
void endTransaction(final Object token);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Notify the listener of a {@link TableLocationKey} encountered while initiating or maintaining the location
* subscription. This should occur at most once per location, but the order of delivery is <i>not</i>
Copy link
Member

@rcaudy rcaudy Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider whether we can have add + remove + add. What about remove + add in the same pull?
Should document that this may change the "at most once per location" guarantee, and define semantics.
I think it should be something like:
We allow re-add of a removed TLK. Downstream consumers should process these in an order that respects delivery and transactionality.

Within one transaction, expect at most one of "remove" or "add" for a given TLK.
Within one transaction, we can allow remove followed by add, but not add followed by remove. This dictates that we deliver pending removes before pending adds in processPending.
That is, one transaction allows:

  1. Replace a TLK (remove followed by add)
  2. Remove a TLK (remove)
  3. Add a TLK (add)
    Double add, double remove, or add followed by remove is right out.

Processing an addition to a transaction.

  1. Remove: If there's an existing accumulated remove, error. Else, if there's an existing accumulated add, error. Else, accumulate the remove.
  2. Add: If there's an existing accumulated add, error. Else, accumulate the add.

Across multiple transactions delivered as a batch, ensure that the right end-state is achieved.

  1. Add + remove collapses pairwise to no-op
  2. Remove + add (assuming prior add) should be processed in order. We might very well choose to not allow re-add at this time, I don't expect Iceberg to do this. If we do allow it, we need to be conscious that the removed location's region(s) need(s) to be used for previous data, while the added one needs to be used for current data.
  3. Multiple adds or removes within without their opposite intervening is an error.

null token should be handled exactly the same as a single-element transaction.

Processing a transaction:

  1. Process removes first. If there's an add pending, then delete, swallow the remove. Else, if there's a remove pending, error. Else, store the remove as pending.
  2. Process adds. If there's an add pending, error. Else, store the add as pending.

Note: removal support means that RegionedColumnSources may no longer be immutable! We need to be sure that we are aware of whether a particular TLP might remove data, and ensure that in those cases the RCS is not marked immutable. REVISED: ONLY REPLACE IS AN ISSUE FOR IMMUTABILITY, AS LONG AS WE DON'T RESUSE SLOTS.

We discussed that TLPs should probably specify whether they are guaranteeing that they will never remove TLKs, and whether their TLs will never remove or modify rows. I think if and when we encounter data sources that require modify support, we should probably just use SourcePartitionedTable instead of PartitionAwareSourceTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I need to handle the RCS immutability question in this PR since Iceberg will not modify rows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing a region makes the values in the corresponding row key range disappear. That's OK for immutability.
If you allow a new region to use the same slot, or allow the old region to reincarnate in the same slot potentially with different data, you are violating immutability.

Not reusing slots means that a long-lived iceberg table may eventually exhaust its row key space.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace (remove + add of a TLK) requires some kind of versioning of the TL, in a way that the TLK is aware of in order to ensure that we provide the table with the right TL for the version. AbstractTableLocationProvider's location caching layer is not currently sufficient for atomically replacing TLs.

* guaranteed.
*
* @param tableLocationKey The new table location key
* @param tableLocationKey The new table location key.
* @param transactionToken The token identifying the transaction.
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*/
void handleTableLocationKey(@NotNull ImmutableTableLocationKey tableLocationKey);
void handleTableLocationKeyAdded(
@NotNull final ImmutableTableLocationKey tableLocationKey,
final Object transactionToken);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Notify the listener of a {@link TableLocationKey} that has been removed.
*
* @param tableLocationKey The table location key that was removed
* @param tableLocationKey The table location key that was removed.
* @param transactionToken The token identifying the transaction.
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*/
void handleTableLocationKeyRemoved(@NotNull ImmutableTableLocationKey tableLocationKey);
void handleTableLocationKeyRemoved(
@NotNull final ImmutableTableLocationKey tableLocationKey,
final Object transactionToken);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -86,7 +106,7 @@ interface Listener extends BasicTableDataListener {
* that {@link #refresh()} or {@link #subscribe(Listener)} has been called prior to calls to the various table
* location fetch methods.
*
* @return this, to allow method chaining
* @return this, to allow method chaining.
*/
TableLocationProvider ensureInitialized();

Expand All @@ -95,21 +115,21 @@ interface Listener extends BasicTableDataListener {
* size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is
* guaranteed to succeed for all results.
*
* @return A collection of keys for locations available from this provider
* @return A collection of keys for locations available from this provider.
*/
@NotNull
Collection<ImmutableTableLocationKey> getTableLocationKeys();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider preserving this method, @Deprecated. Mostly a question of how inconvenient just deleting it would be for @abaranec .


/**
* Check if this provider knows the supplied location key.
*
* @param tableLocationKey The key to test for
* @return Whether the key is known to this provider
* @param tableLocationKey The key to test.
* @return Whether the key is known to this provider.
*/
boolean hasTableLocationKey(@NotNull final TableLocationKey tableLocationKey);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

/**
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get.
* @return The {@link TableLocation} matching the given key
*/
@NotNull
Expand All @@ -122,8 +142,8 @@ default TableLocation getTableLocation(@NotNull TableLocationKey tableLocationKe
}

/**
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get
* @return The {@link TableLocation} matching the given key if present, else null
* @param tableLocationKey A {@link TableLocationKey} specifying the location to get.
* @return The {@link TableLocation} matching the given key if present, else null.
*/
@Nullable
TableLocation getTableLocationIfPresent(@NotNull TableLocationKey tableLocationKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public final ImmutableTableKey getKey() {

@Override
protected final void deliverInitialSnapshot(@NotNull final TableLocationProvider.Listener listener) {
unmodifiableTableLocationKeys.forEach(listener::handleTableLocationKey);
listener.beginTransaction(this);
unmodifiableTableLocationKeys.forEach(tlk -> listener.handleTableLocationKeyAdded(tlk, this));
listener.endTransaction(this);
}

/**
Expand All @@ -110,13 +112,28 @@ protected final void handleTableLocationKey(@NotNull final TableLocationKey loca
visitLocationKey(locationKey);
if (locationCreatedRecorder) {
verifyPartitionKeys(locationKey);
if (subscriptions.deliverNotification(Listener::handleTableLocationKey, toKeyImmutable(result), true)) {
if (subscriptions.deliverNotification(
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
(listener, tlk) -> listener.handleTableLocationKeyAdded(tlk,
AbstractTableLocationProvider.this),
toKeyImmutable(result), true)) {
onEmpty();
}
}
}
}

protected final void beginTransaction() {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
if (subscriptions != null) {
subscriptions.deliverNotification(listener -> listener.beginTransaction(this), true);
}
}

protected final void endTransaction() {
if (subscriptions != null) {
subscriptions.deliverNotification(listener -> listener.endTransaction(this), true);
}
}

/**
* Called <i>after</i> a table location has been visited by {@link #handleTableLocationKey(TableLocationKey)}, but
* before notifications have been delivered to any subscriptions, if applicable. The default implementation does
Expand Down Expand Up @@ -180,6 +197,13 @@ protected void doInitialization() {
@Override
@NotNull
public final Collection<ImmutableTableLocationKey> getTableLocationKeys() {
// We need to prevent reading the map (and maybe mutating it?) during a transaction.
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
// We could transition between two maps, a stable copy and a shadow copy that is being mutated.
// Or we could hold a bigger lock while mutating the map, and hold the same lock here. Sounds like a job for a
// read-write lock (e.g. ReentrantReadWriteLock), maybe. If you want `FunctionalLock`, the pattern (but mostly
// not the code) from io.deephaven.engine.updategraph.UpdateGraphLock could help.
// I think we need the read-write lock for correctness, and I think we need to make it explicit. That is, the
// user needs to be able to get a read lock and hold it while it's operating on the returned collection.
return unmodifiableTableLocationKeys;
}

Expand Down Expand Up @@ -215,8 +239,8 @@ public TableLocation getTableLocationIfPresent(@NotNull final TableLocationKey t
/**
* Remove a {@link TableLocationKey} and its corresponding {@link TableLocation} (if it was created). All
* subscribers to this TableLocationProvider will be
* {@link TableLocationProvider.Listener#handleTableLocationKeyRemoved(ImmutableTableLocationKey) notified}. If the
* TableLocation was created, all of its subscribers will additionally be
* {@link TableLocationProvider.Listener#handleTableLocationKeyRemoved(ImmutableTableLocationKey, Object)}
* notified}. If the TableLocation was created, all of its subscribers will additionally be
* {@link TableLocation.Listener#handleUpdate() notified} that it no longer exists. This TableLocationProvider will
* continue to update other locations and will no longer provide or request information about the removed location.
*
Expand Down Expand Up @@ -257,7 +281,8 @@ public void removeTableLocationKey(@NotNull final TableLocationKey locationKey)
protected void handleTableLocationKeyRemoved(@NotNull final ImmutableTableLocationKey locationKey) {
if (supportsSubscriptions()) {
synchronized (subscriptions) {
if (subscriptions.deliverNotification(Listener::handleTableLocationKeyRemoved, locationKey, true)) {
if (subscriptions.deliverNotification(
(listener, tlk) -> listener.handleTableLocationKeyRemoved(tlk, this), locationKey, true)) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
onEmpty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void subscribe(@NotNull final Listener listener) {
p.subscribe(listener);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
} else {
p.refresh();
p.getTableLocationKeys().forEach(listener::handleTableLocationKey);
p.getTableLocationKeys().forEach(tlk -> listener.handleTableLocationKeyAdded(tlk, this));
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,42 @@ private FilteringListener(@NotNull final TableLocationProvider.Listener outputLi
}

@Override
public void handleTableLocationKey(@NotNull final ImmutableTableLocationKey tableLocationKey) {
public void beginTransaction(final Object token) {
// Delegate to the wrapped listener.
final TableLocationProvider.Listener outputListener = getWrapped();
if (outputListener != null) {
outputListener.beginTransaction(token);
}
}

@Override
public void endTransaction(final Object token) {
// Delegate to the wrapped listener.
final TableLocationProvider.Listener outputListener = getWrapped();
if (outputListener != null) {
outputListener.endTransaction(token);
}
}

@Override
public void handleTableLocationKeyAdded(
@NotNull final ImmutableTableLocationKey tableLocationKey,
final Object transactionToken) {
final TableLocationProvider.Listener outputListener = getWrapped();
// We can't try to clean up null listeners here, the underlying implementation may not allow concurrent
// unsubscribe operations.
if (outputListener != null && locationKeyFilter.accept(tableLocationKey)) {
outputListener.handleTableLocationKey(tableLocationKey);
outputListener.handleTableLocationKeyAdded(tableLocationKey, transactionToken);
}
}

@Override
public void handleTableLocationKeyRemoved(@NotNull final ImmutableTableLocationKey tableLocationKey) {
public void handleTableLocationKeyRemoved(
@NotNull final ImmutableTableLocationKey tableLocationKey,
final Object transactionToken) {
final TableLocationProvider.Listener outputListener = getWrapped();
if (outputListener != null && locationKeyFilter.accept(tableLocationKey)) {
outputListener.handleTableLocationKeyRemoved(tableLocationKey);
outputListener.handleTableLocationKeyRemoved(tableLocationKey, transactionToken);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
//
package io.deephaven.engine.table.impl.locations.impl;

import io.deephaven.engine.table.impl.locations.TableKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.engine.table.impl.locations.TableLocationProvider;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.HashSet;
import java.util.Set;

/**
* Polling-driven {@link TableLocationProvider} implementation that delegates {@link TableLocationKey location key}
* discovery to a {@link TableLocationKeyFinder} and {@link TableLocation location} creation to a
Expand Down Expand Up @@ -46,9 +46,21 @@ public String getImplementationName() {
return IMPLEMENTATION_NAME;
}

// The simplest way to support "push" of new data availability is to provide a callback to the user that just calls
// `refresh`, which would need to become synchronized. Alternatively, we could make an Iceberg-specific aTLP
// implementation that exposes a more specific callback, e.g. with a snapshot ID, as well as the option to disable
// polling. We do need a mechanism to avoid going backwards, probably.
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void refresh() {
locationKeyFinder.findKeys(this::handleTableLocationKey);
beginTransaction();
final Set<ImmutableTableLocationKey> missedKeys = new HashSet<>(getTableLocationKeys());
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
locationKeyFinder.findKeys(tableLocationKey -> {
// noinspection SuspiciousMethodCalls
missedKeys.remove(tableLocationKey);
handleTableLocationKey(tableLocationKey);
});
missedKeys.forEach(this::handleTableLocationKeyRemoved);
endTransaction();
setInitialized();
}

Expand Down
Loading
Loading