Skip to content

Commit

Permalink
feat!: Updated API for Iceberg read operations (#6268)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Oct 28, 2024
1 parent fa27a8e commit 703f527
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 336 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
Expand Down Expand Up @@ -40,7 +40,7 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
/**
* The instructions for customizations while reading.
*/
final IcebergInstructions instructions;
final IcebergReadInstructions instructions;

/**
* A cache of {@link IcebergTableLocationKey IcebergTableLocationKeys} keyed by the URI of the file they represent.
Expand Down Expand Up @@ -83,7 +83,7 @@ protected IcebergTableLocationKey locationKey(
}
}

// Add the data instructions if provided as part of the IcebergInstructions.
// Add the data instructions if provided as part of the IcebergReadInstructions.
if (instructions.dataInstructions().isPresent()) {
builder.setSpecialInstructions(instructions.dataInstructions().get());
} else {
Expand All @@ -104,20 +104,17 @@ protected IcebergTableLocationKey locationKey(

/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
* @param instructions The instructions for customizations while reading.
*/
public IcebergBaseLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@Nullable final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions,
@NotNull final IcebergReadInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableAdapter = tableAdapter;
this.snapshot = tableSnapshot;
this.snapshot = tableAdapter.getSnapshot(instructions);
this.instructions = instructions;
this.dataInstructionsProvider = dataInstructionsProvider;

this.tableDef = tableAdapter.definition(tableSnapshot, instructions);
this.tableDef = tableAdapter.definition(instructions);

this.cache = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@

import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import org.apache.iceberg.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;

Expand All @@ -21,15 +20,13 @@
public final class IcebergFlatLayout extends IcebergBaseLayout {
/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
* @param instructions The instructions for customizations while reading.
*/
public IcebergFlatLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@Nullable final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions,
@NotNull final IcebergReadInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider);
super(tableAdapter, instructions, dataInstructionsProvider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.*;
Expand All @@ -41,17 +40,15 @@ public ColumnData(String name, Class<?> type, int index) {

/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
* @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table.
* @param instructions The instructions for customizations while reading.
*/
public IcebergKeyValuePartitionedLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@Nullable final Snapshot tableSnapshot,
@NotNull final PartitionSpec partitionSpec,
@NotNull final IcebergInstructions instructions,
@NotNull final IcebergReadInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider);
super(tableAdapter, instructions, dataInstructionsProvider);

// We can assume due to upstream validation that there are no duplicate names (after renaming) that are included
// in the output definition, so we can ignore duplicates.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.util;

import io.deephaven.annotations.CopyableStyle;
import io.deephaven.engine.table.TableDefinition;
import org.apache.iceberg.Snapshot;
import org.immutables.value.Value;
import org.immutables.value.Value.Immutable;

import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

/**
* This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in
* this class may change in the future. As such, callers may wish to explicitly set the values.
*/
@Immutable
@CopyableStyle
public abstract class IcebergReadInstructions {
/**
* The default {@link IcebergReadInstructions} to use when reading Iceberg data files. Providing this will use
* system defaults for cloud provider-specific parameters
*/
@SuppressWarnings("unused")
public static final IcebergReadInstructions DEFAULT = builder().build();

public static Builder builder() {
return ImmutableIcebergReadInstructions.builder();
}

/**
* The {@link TableDefinition} to use when reading Iceberg data files.
*/
public abstract Optional<TableDefinition> tableDefinition();

/**
* The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions).
*/
public abstract Optional<Object> dataInstructions();

/**
* A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg
* data files.
*/
public abstract Map<String, String> columnRenames();

/**
* Return a copy of this instructions object with the column renames replaced by {@code entries}.
*/
public abstract IcebergReadInstructions withColumnRenames(Map<String, ? extends String> entries);

/**
* The {@link IcebergUpdateMode} mode to use when reading the Iceberg data files. Default is
* {@link IcebergUpdateMode#staticMode()}.
*/
@Value.Default
public IcebergUpdateMode updateMode() {
return IcebergUpdateMode.staticMode();
}

/**
* The identifier of the snapshot to load for reading. If both this and {@link #snapshot()} are provided, the
* {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is
* provided, the latest snapshot will be loaded.
*/
public abstract OptionalLong snapshotId();

/**
* Return a copy of this instructions object with the snapshot ID replaced by {@code value}.
*/
public abstract IcebergReadInstructions withSnapshotId(long value);

/**
* The snapshot to load for reading. If both this and {@link #snapshotId()} are provided, the
* {@link Snapshot#snapshotId()} should match the {@link #snapshotId()}. Otherwise, only one of them should be
* provided. If neither is provided, the latest snapshot will be loaded.
*/
public abstract Optional<Snapshot> snapshot();

/**
* Return a copy of this instructions object with the snapshot replaced by {@code value}.
*/
public abstract IcebergReadInstructions withSnapshot(Snapshot value);

public interface Builder {
Builder tableDefinition(TableDefinition tableDefinition);

Builder dataInstructions(Object s3Instructions);

Builder putColumnRenames(String key, String value);

Builder putAllColumnRenames(Map<String, ? extends String> entries);

Builder updateMode(IcebergUpdateMode updateMode);

Builder snapshotId(long snapshotId);

Builder snapshot(Snapshot snapshot);

IcebergReadInstructions build();
}

@Value.Check
final void checkSnapshotId() {
if (snapshotId().isPresent() && snapshot().isPresent() &&
snapshotId().getAsLong() != snapshot().get().snapshotId()) {
throw new IllegalArgumentException("If both snapshotID and snapshot are provided, the snapshot Ids " +
"must match, found " + snapshotId().getAsLong() + " and " + snapshot().get().snapshotId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public interface IcebergTable extends Table {
/**
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with the latest snapshot from
* the catalog.
* <p>
Expand All @@ -18,7 +18,7 @@ public interface IcebergTable extends Table {
void update();

/**
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from
* the catalog. If the {@code snapshotId} is not found in the list of snapshots for the table, an
* {@link IllegalArgumentException} is thrown. The input snapshot must also be newer (higher in sequence number)
Expand All @@ -31,7 +31,7 @@ public interface IcebergTable extends Table {
void update(final long snapshotId);

/**
* When the {@link IcebergInstructions#updateMode() update mode} for this table is
* When the {@link IcebergReadInstructions#updateMode() update mode} for this table is
* {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from
* the catalog. The input snapshot must be newer (higher in sequence number) than the current snapshot or an
* {@link IllegalArgumentException} is thrown.
Expand Down
Loading

0 comments on commit 703f527

Please sign in to comment.