Skip to content

Commit

Permalink
Updated to use IcebergTableAdapter and exposed in python. Addressed P…
Browse files Browse the repository at this point in the history
…R comments.
  • Loading branch information
lbooker42 committed Aug 30, 2024
1 parent 6607fc3 commit 893336f
Show file tree
Hide file tree
Showing 18 changed files with 925 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.referencecounting.ReferenceCounted;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -35,6 +36,8 @@ public abstract class AbstractTableLocation
private final KeyedObjectHashMap<CharSequence, ColumnLocation> columnLocations =
new KeyedObjectHashMap<>(StringUtils.charSequenceKey());

private final ReferenceCounted referenceCounted;

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<AbstractTableLocation, KeyedObjectHashMap> CACHED_DATA_INDEXES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
Expand All @@ -58,6 +61,15 @@ protected AbstractTableLocation(@NotNull final TableKey tableKey,
super(supportsSubscriptions);
this.tableKey = Require.neqNull(tableKey, "tableKey").makeImmutable();
this.tableLocationKey = Require.neqNull(tableLocationKey, "tableLocationKey").makeImmutable();

referenceCounted = new ReferenceCounted() {
@Override
protected void onReferenceCountAtZero() {
// Call the location's onReferenceCountAtZero method
AbstractTableLocation.this.onReferenceCountAtZero();
}
};

}

@Override
Expand Down Expand Up @@ -158,7 +170,7 @@ public final ColumnLocation getColumnLocation(@NotNull final CharSequence name)
* Clear all column locations (usually because a truncated location was observed).
*/
@SuppressWarnings("unused")
protected final void clearColumnLocations() {
public final void clearColumnLocations() {
columnLocations.clear();
}

Expand Down Expand Up @@ -229,4 +241,34 @@ public final BasicDataIndex getDataIndex(@NotNull final String... columns) {
@InternalUseOnly
@Nullable
public abstract BasicDataIndex loadDataIndex(@NotNull String... columns);

// ------------------------------------------------------------------------------------------------------------------
// Reference counting implementation
// ------------------------------------------------------------------------------------------------------------------

/**
* Increment the reference count by one.
*
* @throws IllegalStateException If the reference count was not successfully incremented
*/
public final void incrementReferenceCount() {
referenceCounted.incrementReferenceCount();
}

/**
* Decrement the reference count by one, when the reference count reaches zero this location will be cleared.
*
* @throws IllegalStateException If the reference count was not successfully incremented
*/
public void decrementReferenceCount() {
referenceCounted.decrementReferenceCount();
}

/**
* The reference count has reached zero, we can clear this location and release any resources.
*/
private void onReferenceCountAtZero() {
handleUpdate(null, System.currentTimeMillis());
clearColumnLocations();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
//
package io.deephaven.engine.table.impl.locations.impl;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.locations.*;
import io.deephaven.engine.updategraph.UpdateCommitter;
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -83,9 +81,6 @@ void removeLocationKey(TableLocationKey locationKey) {
(Collection<ImmutableTableLocationKey>) (Collection<? extends TableLocationKey>) Collections
.unmodifiableCollection(tableLocations.keySet());

final List<AbstractTableLocation> locationsToClear;
final UpdateCommitter<?> locationClearCommitter;

private volatile boolean initialized;

private List<String> partitionKeys;
Expand All @@ -101,18 +96,6 @@ protected AbstractTableLocationProvider(@NotNull final TableKey tableKey, final
super(supportsSubscriptions);
this.tableKey = tableKey.makeImmutable();
this.partitionKeys = null;

locationsToClear = new ArrayList<>();
locationClearCommitter = new UpdateCommitter<>(this,
ExecutionContext.getContext().getUpdateGraph(),
(ignored) -> {
locationsToClear.forEach(location -> {
location.handleUpdate(null, System.currentTimeMillis());
location.clearColumnLocations();

});
locationsToClear.clear();
});
}

/**
Expand Down Expand Up @@ -195,7 +178,6 @@ protected void endTransaction(@NotNull final Object token) {
for (TableLocationKey locationKey : transaction.locationsRemoved) {
final Object removedLocation = tableLocations.remove(locationKey);
if (removedLocation != null) {
maybeClearLocationForRemoval(removedLocation);
removedImmutableKeys.add(toKeyImmutable(locationKey));
}
}
Expand Down Expand Up @@ -432,7 +414,6 @@ protected void handleTableLocationKeyRemoved(
@NotNull final TableLocationKey locationKey,
@Nullable final Object transactionToken) {
if (!supportsSubscriptions()) {
maybeClearLocationForRemoval(tableLocations.remove(locationKey));
return;
}

Expand All @@ -457,7 +438,6 @@ protected void handleTableLocationKeyRemoved(
synchronized (subscriptions) {
final Object removedLocation = tableLocations.remove(locationKey);
if (removedLocation != null) {
maybeClearLocationForRemoval(removedLocation);
if (subscriptions.deliverNotification(
Listener::handleTableLocationKeyRemoved,
locationKey.makeImmutable(),
Expand All @@ -468,13 +448,6 @@ protected void handleTableLocationKeyRemoved(
}
}

private synchronized void maybeClearLocationForRemoval(@Nullable final Object removedLocation) {
if (removedLocation instanceof AbstractTableLocation) {
locationsToClear.add((AbstractTableLocation) removedLocation);
locationClearCommitter.maybeActivate();
}
}

private void verifyPartitionKeys(@NotNull final TableLocationKey locationKey) {
if (partitionKeys == null) {
partitionKeys = new ArrayList<>(locationKey.getPartitionKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocation;
import io.deephaven.engine.table.impl.locations.impl.TableLocationUpdateSubscriptionBuffer;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ObjectArraySource;
Expand Down Expand Up @@ -103,6 +104,9 @@ public class RegionedColumnSourceManager extends LivenessArtifact implements Col
@ReferentialIntegrity
private final Collection<DataIndex> retainedDataIndexes = new ArrayList<>();

private final List<AbstractTableLocation> locationsToClear;
private final UpdateCommitter<?> locationClearCommitter;

/**
* A reference to a delayed error notifier for the {@link #includedLocationsTable}, if one is pending.
*/
Expand Down Expand Up @@ -192,6 +196,19 @@ public class RegionedColumnSourceManager extends LivenessArtifact implements Col
}
}


locationsToClear = new ArrayList<>();
locationClearCommitter = new UpdateCommitter<>(this,
ExecutionContext.getContext().getUpdateGraph(),
(ignored) -> {
locationsToClear.forEach(location -> {
location.handleUpdate(null, System.currentTimeMillis());
location.clearColumnLocations();

});
locationsToClear.clear();
});

invalidateCommitter = new UpdateCommitter<>(this,
ExecutionContext.getContext().getUpdateGraph(),
(ignored) -> {
Expand All @@ -212,6 +229,9 @@ public synchronized void addLocation(@NotNull final TableLocation tableLocation)
log.debug().append("LOCATION_ADDED:").append(tableLocation.toString()).endl();
}
emptyTableLocations.add(new EmptyTableLocationEntry(tableLocation));
if (tableLocation instanceof AbstractTableLocation) {
((AbstractTableLocation) tableLocation).incrementReferenceCount();
}
} else {
// Duplicate location - not allowed
final TableLocation duplicateLocation =
Expand Down Expand Up @@ -656,6 +676,9 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {

private void invalidate() {
columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex));
if (location instanceof AbstractTableLocation) {
((AbstractTableLocation) location).decrementReferenceCount();
}
}

@Override
Expand Down
1 change: 1 addition & 0 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
id 'io.deephaven.hadoop-common-dependencies'
}

evaluationDependsOn Docker.registryProject('localstack')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.iceberg.util;

import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
Expand Down Expand Up @@ -68,14 +69,11 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

// final org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();

final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.setConf(new Configuration());
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
return new IcebergCatalogAdapter(catalog);
}

/**
Expand Down Expand Up @@ -103,11 +101,10 @@ public static IcebergCatalogAdapter createGlue(
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.setConf(new Configuration());
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
return new IcebergCatalogAdapter(catalog);
}
}
Loading

0 comments on commit 893336f

Please sign in to comment.