diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index 180b540d768..f07bbc1063a 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -97,7 +97,7 @@ public abstract class IcebergToolsTest { ColumnDefinition.ofString("ColumnType"), ColumnDefinition.ofBoolean("IsPartitioning")); - IcebergInstructions instructions; + IcebergReadInstructions instructions; public abstract S3AsyncClient s3AsyncClient(); @@ -129,7 +129,7 @@ public void setUp() throws ExecutionException, InterruptedException { final S3Instructions s3Instructions = s3Instructions(S3Instructions.builder()).build(); - instructions = IcebergInstructions.builder() + instructions = IcebergReadInstructions.builder() .dataInstructions(s3Instructions) .build(); } @@ -327,11 +327,6 @@ public void testOpenTableS3Only() throws ExecutionException, InterruptedExceptio public void testOpenTableDefinition() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() - .tableDefinition(SALES_PARTITIONED_DEFINITION) - .dataInstructions(instructions.dataInstructions().get()) - .build(); - final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); final IcebergTableAdapter tableAdapter = adapter.loadTable("sales.sales_partitioned"); final io.deephaven.engine.table.Table table = tableAdapter.table(instructions); @@ -352,7 +347,7 @@ public void testOpenTablePartitionTypeException() { ColumnDefinition.ofLong("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -386,7 +381,7 @@ public void testOpenTableDefinitionRename() throws ExecutionException, Interrupt ColumnDefinition.ofDouble("UnitPrice"), ColumnDefinition.ofTime("OrderDate")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(renamed) .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Region", "RegionName") @@ -420,7 +415,7 @@ public void testSkippedPartitioningColumn() throws ExecutionException, Interrupt ColumnDefinition.ofDouble("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -434,6 +429,7 @@ public void testSkippedPartitioningColumn() throws ExecutionException, Interrupt Assert.equals(table.getDefinition(), "table.getDefinition()", tableDef); } + @Test public void testReorderedPartitioningColumn() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); @@ -447,7 +443,7 @@ public void testReorderedPartitioningColumn() throws ExecutionException, Interru ColumnDefinition.ofDouble("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -465,7 +461,7 @@ public void testReorderedPartitioningColumn() throws ExecutionException, Interru public void testZeroPartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(SALES_MULTI_DEFINITION) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -490,7 +486,7 @@ public void testIncorrectPartitioningColumns() throws ExecutionException, Interr ColumnDefinition.ofDouble("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -523,7 +519,7 @@ public void testMissingPartitioningColumns() { ColumnDefinition.ofLong("Unit_Price"), ColumnDefinition.ofTime("Order_Date")); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -548,7 +544,7 @@ public void testMissingPartitioningColumns() { public void testOpenTableColumnRename() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Region", "RegionName") .putColumnRenames("Item_Type", "ItemType") @@ -566,7 +562,7 @@ public void testOpenTableColumnRename() throws ExecutionException, InterruptedEx public void testOpenTableColumnLegalization() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesRenamed(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .build(); @@ -584,7 +580,7 @@ public void testOpenTableColumnLegalizationRename() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesRenamed(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Item&Type", "Item_Type") .putColumnRenames("Units/Sold", "Units_Sold") @@ -613,7 +609,7 @@ public void testOpenTableColumnLegalizationPartitionException() { ColumnDefinition.ofInt("Year").withPartitioning(), ColumnDefinition.ofInt("Month").withPartitioning()); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .tableDefinition(tableDef) .putColumnRenames("Year", "Current Year") .putColumnRenames("Month", "Current Month") @@ -641,7 +637,7 @@ public void testOpenTableColumnRenamePartitioningColumns() throws ExecutionException, InterruptedException, TimeoutException { uploadSalesPartitioned(); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("VendorID", "vendor_id") .putColumnRenames("month", "__month") @@ -676,32 +672,32 @@ public void testOpenTableSnapshot() throws ExecutionException, InterruptedExcept // Verify we retrieved all the rows. final io.deephaven.engine.table.Table table0 = - tableAdapter.table(snapshots.get(0).snapshotId(), instructions); + tableAdapter.table(instructions.withSnapshotId(snapshots.get(0).snapshotId())); Assert.eq(table0.size(), "table0.size()", 18073, "expected rows in the table"); Assert.equals(table0.getDefinition(), "table0.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table1 = - tableAdapter.table(snapshots.get(1).snapshotId(), instructions); + tableAdapter.table(instructions.withSnapshotId(snapshots.get(1).snapshotId())); Assert.eq(table1.size(), "table1.size()", 54433, "expected rows in the table"); Assert.equals(table1.getDefinition(), "table1.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table2 = - tableAdapter.table(snapshots.get(2).snapshotId(), instructions); + tableAdapter.table(instructions.withSnapshotId(snapshots.get(2).snapshotId())); Assert.eq(table2.size(), "table2.size()", 72551, "expected rows in the table"); Assert.equals(table2.getDefinition(), "table2.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table3 = - tableAdapter.table(snapshots.get(3).snapshotId(), instructions); + tableAdapter.table(instructions.withSnapshotId(snapshots.get(3).snapshotId())); Assert.eq(table3.size(), "table3.size()", 100_000, "expected rows in the table"); Assert.equals(table3.getDefinition(), "table3.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table4 = - tableAdapter.table(snapshots.get(4).snapshotId(), instructions); + tableAdapter.table(instructions.withSnapshotId(snapshots.get(4).snapshotId())); Assert.eq(table4.size(), "table4.size()", 100_000, "expected rows in the table"); Assert.equals(table4.getDefinition(), "table4.getDefinition()", SALES_MULTI_DEFINITION); final io.deephaven.engine.table.Table table5 = - tableAdapter.table(snapshots.get(5).snapshotId(), instructions); + tableAdapter.table(instructions.withSnapshotId(snapshots.get(5).snapshotId())); Assert.eq(table5.size(), "table5.size()", 0, "expected rows in the table"); Assert.equals(table5.getDefinition(), "table5.getDefinition()", SALES_MULTI_DEFINITION); } @@ -715,32 +711,32 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx final List snapshots = tableAdapter.listSnapshots(); // Verify we retrieved all the rows. - io.deephaven.engine.table.Table table0 = tableAdapter.table(snapshots.get(0), instructions); + io.deephaven.engine.table.Table table0 = tableAdapter.table(instructions.withSnapshot(snapshots.get(0))); Assert.eq(table0.size(), "table0.size()", 18073, "expected rows in the table"); Assert.equals(table0.getDefinition(), "table0.getDefinition()", SALES_MULTI_DEFINITION); - io.deephaven.engine.table.Table table1 = tableAdapter.table(snapshots.get(1), instructions); + io.deephaven.engine.table.Table table1 = tableAdapter.table(instructions.withSnapshot(snapshots.get(1))); Assert.eq(table1.size(), "table1.size()", 54433, "expected rows in the table"); Assert.equals(table1.getDefinition(), "table1.getDefinition()", SALES_MULTI_DEFINITION); - io.deephaven.engine.table.Table table2 = tableAdapter.table(snapshots.get(2), instructions); + io.deephaven.engine.table.Table table2 = tableAdapter.table(instructions.withSnapshot(snapshots.get(2))); Assert.eq(table2.size(), "table2.size()", 72551, "expected rows in the table"); Assert.equals(table2.getDefinition(), "table2.getDefinition()", SALES_MULTI_DEFINITION); - io.deephaven.engine.table.Table table3 = tableAdapter.table(snapshots.get(3), instructions); + io.deephaven.engine.table.Table table3 = tableAdapter.table(instructions.withSnapshot(snapshots.get(3))); Assert.eq(table3.size(), "table3.size()", 100_000, "expected rows in the table"); Assert.equals(table3.getDefinition(), "table3.getDefinition()", SALES_MULTI_DEFINITION); - io.deephaven.engine.table.Table table4 = tableAdapter.table(snapshots.get(4), instructions); + io.deephaven.engine.table.Table table4 = tableAdapter.table(instructions.withSnapshot(snapshots.get(4))); Assert.eq(table4.size(), "table4.size()", 100_000, "expected rows in the table"); Assert.equals(table4.getDefinition(), "table4.getDefinition()", SALES_MULTI_DEFINITION); - io.deephaven.engine.table.Table table5 = tableAdapter.table(snapshots.get(5), instructions); + io.deephaven.engine.table.Table table5 = tableAdapter.table(instructions.withSnapshot(snapshots.get(5))); Assert.eq(table5.size(), "table5.size()", 0, "expected rows in the table"); Assert.equals(table5.getDefinition(), "table5.getDefinition()", SALES_MULTI_DEFINITION); try { - io.deephaven.engine.table.Table missing = tableAdapter.table(987654321L, instructions); + io.deephaven.engine.table.Table missing = tableAdapter.table(instructions.withSnapshotId(987654321L)); Assert.statementNeverExecuted("Expected an exception for invalid snapshot"); } catch (final Exception e) { Assert.assertion(e instanceof IllegalArgumentException, "e instanceof IllegalArgumentException"); @@ -776,11 +772,15 @@ public void testTableDefinition() { Assert.equals(tableDef, "tableDef", SALES_MULTI_DEFINITION); // Use string and long snapshot ID - tableDef = tableAdapter.definition(snapshots.get(0).snapshotId(), null); + tableDef = tableAdapter.definition(IcebergReadInstructions.builder() + .snapshotId(snapshots.get(0).snapshotId()) + .build()); Assert.equals(tableDef, "tableDef", SALES_MULTI_DEFINITION); // Use TableIdentifier and Snapshot - tableDef = tableAdapter.definition(snapshots.get(0), null); + tableDef = tableAdapter.definition(IcebergReadInstructions.builder() + .snapshot(snapshots.get(0)) + .build()); Assert.equals(tableDef, "tableDef", SALES_MULTI_DEFINITION); } @@ -825,13 +825,14 @@ public void testTableDefinitionTable() { Assert.equals(tableDefTable.getDefinition(), "tableDefTable.getDefinition()", META_DEF); // Use string and long snapshot ID - tableDefTable = tableAdapter.definitionTable(snapshots.get(0).snapshotId(), null); + tableDefTable = tableAdapter + .definitionTable(IcebergReadInstructions.DEFAULT.withSnapshotId(snapshots.get(0).snapshotId())); Assert.eq(tableDefTable.size(), "tableDefTable.size()", 5, "expected rows in the table"); Assert.equals(tableDefTable.getDefinition(), "tableDefTable.getDefinition()", META_DEF); // Use TableIdentifier and Snapshot - tableDefTable = tableAdapter.definitionTable(snapshots.get(0), null); + tableDefTable = tableAdapter.definitionTable(IcebergReadInstructions.DEFAULT.withSnapshot(snapshots.get(0))); Assert.eq(tableDefTable.size(), "tableDefTable.size()", 5, "expected rows in the table"); Assert.equals(tableDefTable.getDefinition(), "tableDefTable.getDefinition()", META_DEF); @@ -842,7 +843,7 @@ public void testTableDefinitionWithInstructions() { final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); final IcebergTableAdapter tableAdapter = adapter.loadTable("sales.sales_multi"); - IcebergInstructions localInstructions = IcebergInstructions.builder() + IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .putColumnRenames("Region", "Area") .putColumnRenames("Item_Type", "ItemType") @@ -869,7 +870,7 @@ public void testTableDefinitionWithInstructions() { ColumnDefinition.ofString("Item_Type"), ColumnDefinition.ofTime("Order_Date")); - localInstructions = IcebergInstructions.builder() + localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .tableDefinition(userTableDef) .build(); @@ -885,7 +886,7 @@ public void testManualRefreshingTable() throws ExecutionException, InterruptedEx final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog); - final IcebergInstructions localInstructions = IcebergInstructions.builder() + final IcebergReadInstructions localInstructions = IcebergReadInstructions.builder() .dataInstructions(instructions.dataInstructions().get()) .updateMode(IcebergUpdateMode.manualRefreshingMode()) .build(); @@ -894,7 +895,8 @@ public void testManualRefreshingTable() throws ExecutionException, InterruptedEx final List snapshots = tableAdapter.listSnapshots(); final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); - final IcebergTableImpl table = (IcebergTableImpl) tableAdapter.table(snapshots.get(0), localInstructions); + final IcebergTableImpl table = + (IcebergTableImpl) tableAdapter.table(localInstructions.withSnapshot(snapshots.get(0))); // Initial size Assert.eq(table.size(), "table.size()", 18073, "expected rows in the table"); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index d49b394438b..970ad673581 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -10,7 +10,7 @@ import io.deephaven.iceberg.location.IcebergTableLocationKey; import io.deephaven.iceberg.location.IcebergTableParquetLocationKey; import io.deephaven.iceberg.relative.RelativeFileIO; -import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; @@ -40,7 +40,7 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder(); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java index e3b64d5e4d8..672bbf114fd 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java @@ -5,12 +5,11 @@ import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.iceberg.location.IcebergTableLocationKey; -import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import io.deephaven.iceberg.util.IcebergTableAdapter; import org.apache.iceberg.*; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import java.net.URI; @@ -21,15 +20,13 @@ public final class IcebergFlatLayout extends IcebergBaseLayout { /** * @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table. - * @param tableSnapshot The {@link Snapshot} from which to discover data files. * @param instructions The instructions for customizations while reading. */ public IcebergFlatLayout( @NotNull final IcebergTableAdapter tableAdapter, - @Nullable final Snapshot tableSnapshot, - @NotNull final IcebergInstructions instructions, + @NotNull final IcebergReadInstructions instructions, @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { - super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider); + super(tableAdapter, instructions, dataInstructionsProvider); } @Override diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index c5787d5454c..2b872dcae45 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -7,14 +7,13 @@ import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.iceberg.location.IcebergTableLocationKey; -import io.deephaven.iceberg.util.IcebergInstructions; +import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import io.deephaven.util.type.TypeUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.iceberg.*; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import java.net.URI; import java.util.*; @@ -41,17 +40,15 @@ public ColumnData(String name, Class type, int index) { /** * @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table. - * @param tableSnapshot The {@link Snapshot} from which to discover data files. * @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table. * @param instructions The instructions for customizations while reading. */ public IcebergKeyValuePartitionedLayout( @NotNull final IcebergTableAdapter tableAdapter, - @Nullable final Snapshot tableSnapshot, @NotNull final PartitionSpec partitionSpec, - @NotNull final IcebergInstructions instructions, + @NotNull final IcebergReadInstructions instructions, @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { - super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider); + super(tableAdapter, instructions, dataInstructionsProvider); // We can assume due to upstream validation that there are no duplicate names (after renaming) that are included // in the output definition, so we can ignore duplicates. diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java deleted file mode 100644 index a77a02b341e..00000000000 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java +++ /dev/null @@ -1,81 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.iceberg.util; - -import io.deephaven.annotations.CopyableStyle; -import io.deephaven.engine.table.TableDefinition; -import org.immutables.value.Value; -import org.immutables.value.Value.Immutable; - -import java.util.Map; -import java.util.Optional; - -/** - * This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in - * this class may change in the future. As such, callers may wish to explicitly set the values. - */ -@Immutable -@CopyableStyle -public abstract class IcebergInstructions { - /** - * The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system - * defaults for cloud provider-specific parameters - */ - @SuppressWarnings("unused") - public static final IcebergInstructions DEFAULT = builder().build(); - - public static Builder builder() { - return ImmutableIcebergInstructions.builder(); - } - - /** - * The {@link TableDefinition} to use when reading Iceberg data files. - */ - public abstract Optional tableDefinition(); - - /** - * The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud - * provider-specific instructions). - */ - public abstract Optional dataInstructions(); - - /** - * A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg - * data files. - */ - public abstract Map columnRenames(); - - /** - * Return a copy of this instructions object with the column renames replaced by {@code entries}. - */ - public abstract IcebergInstructions withColumnRenames(Map entries); - - /** - * The {@link IcebergUpdateMode} mode to use when reading the Iceberg data files. Default is - * {@link IcebergUpdateMode#staticMode()}. - */ - @Value.Default - public IcebergUpdateMode updateMode() { - return IcebergUpdateMode.staticMode(); - } - - public interface Builder { - @SuppressWarnings("unused") - Builder tableDefinition(TableDefinition tableDefinition); - - @SuppressWarnings("unused") - Builder dataInstructions(Object s3Instructions); - - @SuppressWarnings("unused") - Builder putColumnRenames(String key, String value); - - @SuppressWarnings("unused") - Builder putAllColumnRenames(Map entries); - - @SuppressWarnings("unused") - Builder updateMode(IcebergUpdateMode updateMode); - - IcebergInstructions build(); - } -} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java new file mode 100644 index 00000000000..82271590900 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java @@ -0,0 +1,115 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import io.deephaven.annotations.CopyableStyle; +import io.deephaven.engine.table.TableDefinition; +import org.apache.iceberg.Snapshot; +import org.immutables.value.Value; +import org.immutables.value.Value.Immutable; + +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class provides instructions intended for reading Iceberg catalogs and tables. The default values documented in + * this class may change in the future. As such, callers may wish to explicitly set the values. + */ +@Immutable +@CopyableStyle +public abstract class IcebergReadInstructions { + /** + * The default {@link IcebergReadInstructions} to use when reading Iceberg data files. Providing this will use + * system defaults for cloud provider-specific parameters + */ + @SuppressWarnings("unused") + public static final IcebergReadInstructions DEFAULT = builder().build(); + + public static Builder builder() { + return ImmutableIcebergReadInstructions.builder(); + } + + /** + * The {@link TableDefinition} to use when reading Iceberg data files. + */ + public abstract Optional tableDefinition(); + + /** + * The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud + * provider-specific instructions). + */ + public abstract Optional dataInstructions(); + + /** + * A {@link Map map} of rename instructions from Iceberg to Deephaven column names to use when reading the Iceberg + * data files. + */ + public abstract Map columnRenames(); + + /** + * Return a copy of this instructions object with the column renames replaced by {@code entries}. + */ + public abstract IcebergReadInstructions withColumnRenames(Map entries); + + /** + * The {@link IcebergUpdateMode} mode to use when reading the Iceberg data files. Default is + * {@link IcebergUpdateMode#staticMode()}. + */ + @Value.Default + public IcebergUpdateMode updateMode() { + return IcebergUpdateMode.staticMode(); + } + + /** + * The identifier of the snapshot to load for reading. If both this and {@link #snapshot()} are provided, the + * {@link Snapshot#snapshotId()} should match this. Otherwise, only one of them should be provided. If neither is + * provided, the latest snapshot will be loaded. + */ + public abstract OptionalLong snapshotId(); + + /** + * Return a copy of this instructions object with the snapshot ID replaced by {@code value}. + */ + public abstract IcebergReadInstructions withSnapshotId(long value); + + /** + * The snapshot to load for reading. If both this and {@link #snapshotId()} are provided, the + * {@link Snapshot#snapshotId()} should match the {@link #snapshotId()}. Otherwise, only one of them should be + * provided. If neither is provided, the latest snapshot will be loaded. + */ + public abstract Optional snapshot(); + + /** + * Return a copy of this instructions object with the snapshot replaced by {@code value}. + */ + public abstract IcebergReadInstructions withSnapshot(Snapshot value); + + public interface Builder { + Builder tableDefinition(TableDefinition tableDefinition); + + Builder dataInstructions(Object s3Instructions); + + Builder putColumnRenames(String key, String value); + + Builder putAllColumnRenames(Map entries); + + Builder updateMode(IcebergUpdateMode updateMode); + + Builder snapshotId(long snapshotId); + + Builder snapshot(Snapshot snapshot); + + IcebergReadInstructions build(); + } + + @Value.Check + final void checkSnapshotId() { + if (snapshotId().isPresent() && snapshot().isPresent() && + snapshotId().getAsLong() != snapshot().get().snapshotId()) { + throw new IllegalArgumentException("If both snapshotID and snapshot are provided, the snapshot Ids " + + "must match, found " + snapshotId().getAsLong() + " and " + snapshot().get().snapshotId()); + } + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTable.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTable.java index 89ef0994011..2779731c306 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTable.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTable.java @@ -9,7 +9,7 @@ public interface IcebergTable extends Table { /** - * When the {@link IcebergInstructions#updateMode() update mode} for this table is + * When the {@link IcebergReadInstructions#updateMode() update mode} for this table is * {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with the latest snapshot from * the catalog. *

@@ -18,7 +18,7 @@ public interface IcebergTable extends Table { void update(); /** - * When the {@link IcebergInstructions#updateMode() update mode} for this table is + * When the {@link IcebergReadInstructions#updateMode() update mode} for this table is * {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from * the catalog. If the {@code snapshotId} is not found in the list of snapshots for the table, an * {@link IllegalArgumentException} is thrown. The input snapshot must also be newer (higher in sequence number) @@ -31,7 +31,7 @@ public interface IcebergTable extends Table { void update(final long snapshotId); /** - * When the {@link IcebergInstructions#updateMode() update mode} for this table is + * When the {@link IcebergReadInstructions#updateMode() update mode} for this table is * {@link IcebergUpdateMode#manualRefreshingMode()}, this call will update the table with a specific snapshot from * the catalog. The input snapshot must be newer (higher in sequence number) than the current snapshot or an * {@link IllegalArgumentException} is thrown. diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java index 8ea62b26027..4b91f511881 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java @@ -24,6 +24,7 @@ import io.deephaven.iceberg.location.IcebergTableLocationFactory; import io.deephaven.iceberg.location.IcebergTableLocationKey; import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.annotations.InternalUseOnly; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; @@ -102,6 +103,7 @@ private List getSnapshots() { * List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting * table will be static and contain the following columns: * + * * * * @@ -180,7 +182,7 @@ public Table snapshots() { * * @param snapshotId The identifier of the snapshot to load. * - * @return An Optional containing the requested snapshot if it exists. + * @return An {@link Optional} containing the requested {@link Snapshot} if it exists. */ private Optional snapshot(final long snapshotId) { Optional found = getSnapshots().stream() @@ -228,234 +230,171 @@ public synchronized Optional schema(final int schemaId) { } /** - * Return {@link TableDefinition table definition}. - * - * @return The table definition - */ - public TableDefinition definition() { - // Load the table from the catalog. - return definition(null); - } - - /** - * Return {@link TableDefinition table definition} with optional instructions for customizations while reading. - * - * @param instructions The instructions for customizations while reading (or null for default instructions) - * @return The table definition + * Retrieves the appropriate {@link Snapshot} based on the provided {@link IcebergReadInstructions}, or {@code null} + * if no {@link IcebergReadInstructions#snapshot() snapshot} or {@link IcebergReadInstructions#snapshotId() + * snapshotId} is provided. */ - public TableDefinition definition(@Nullable final IcebergInstructions instructions) { - // Load the table from the catalog. - return definition(null, instructions); + @InternalUseOnly + @Nullable + public Snapshot getSnapshot(@NotNull final IcebergReadInstructions readInstructions) { + if (readInstructions.snapshot().isPresent()) { + return readInstructions.snapshot().get(); + } else if (readInstructions.snapshotId().isPresent()) { + return snapshot(readInstructions.snapshotId().getAsLong()) + .orElseThrow(() -> new IllegalArgumentException( + "Snapshot with id " + readInstructions.snapshotId().getAsLong() + " not found for " + + "table " + tableIdentifier)); + } + return null; } /** - * Return {@link TableDefinition table definition} for the Iceberg table and snapshot id, with optional instructions - * for customizations while reading. - * - * @param snapshotId The identifier of the snapshot to load - * @param instructions The instructions for customizations while reading (or null for default instructions) - * @return The table definition + * Used to hold return value for {@link #getSpecAndSchema(IcebergReadInstructions)}. */ - public TableDefinition definition( - final long snapshotId, - @Nullable final IcebergInstructions instructions) { - // Find the snapshot with the given snapshot id - final Snapshot tableSnapshot = - snapshot(snapshotId).orElseThrow(() -> new IllegalArgumentException( - "Snapshot with id " + snapshotId + " not found for table " + tableIdentifier)); - - // Load the table from the catalog. - return definition(tableSnapshot, instructions); + private static final class SpecAndSchema { + private final Schema schema; + private final PartitionSpec partitionSpec; + private final IcebergReadInstructions readInstructions; + + private SpecAndSchema( + @NotNull final Schema schema, + @NotNull final PartitionSpec partitionSpec, + @NotNull final IcebergReadInstructions readInstructions) { + this.schema = schema; + this.partitionSpec = partitionSpec; + this.readInstructions = readInstructions; + } } /** - * Return {@link TableDefinition table definition} for the Iceberg table and snapshot, with optional instructions - * for customizations while reading. - * - * @param tableSnapshot The snapshot to load - * @param instructions The instructions for customizations while reading (or null for default instructions) - * @return The table definition + * Retrieve the schema and partition spec for the table based on the provided read instructions. Also, populate the + * read instructions with the requested snapshot, or the latest snapshot if none is requested. */ - public TableDefinition definition( - @Nullable final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { - + private SpecAndSchema getSpecAndSchema(@NotNull final IcebergReadInstructions readInstructions) { + final Snapshot snapshot; final Schema schema; - final org.apache.iceberg.PartitionSpec partitionSpec; + final PartitionSpec partitionSpec; + final IcebergReadInstructions updatedInstructions; - if (tableSnapshot == null) { + final Snapshot snapshotFromInstructions = getSnapshot(readInstructions); + if (snapshotFromInstructions == null) { synchronized (this) { // Refresh only once and record the current schema and partition spec. refresh(); + snapshot = table.currentSnapshot(); schema = table.schema(); partitionSpec = table.spec(); } + if (snapshot != null) { + // Update the read instructions with the snapshot. + updatedInstructions = readInstructions.withSnapshot(snapshot); + } else { + updatedInstructions = readInstructions; + } } else { // Use the schema from the snapshot - schema = schema(tableSnapshot.schemaId()).get(); + snapshot = snapshotFromInstructions; + schema = schema(snapshot.schemaId()).orElseThrow(() -> new IllegalArgumentException( + "Schema with id " + snapshot.schemaId() + " not found for table " + tableIdentifier + ", snapshot " + + snapshot.snapshotId())); partitionSpec = table.spec(); + updatedInstructions = readInstructions.withSnapshot(snapshot); } - final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions; - - return fromSchema(schema, - partitionSpec, - userInstructions.tableDefinition().orElse(null), - getRenameColumnMap(table, schema, userInstructions)); + return new SpecAndSchema(schema, partitionSpec, updatedInstructions); } /** - * Return {@link Table table} containing the {@link TableDefinition definition} of the Iceberg table. + * Return {@link TableDefinition table definition} corresponding to this iceberg table * - * @return The table definition as a Deephaven table + * @return The table definition */ - public Table definitionTable() { - return TableTools.metaTable(definition()); + public TableDefinition definition() { + return definition(IcebergReadInstructions.DEFAULT); } /** - * Return {@link Table table} containing the {@link TableDefinition definition} of the Iceberg table, with optional - * instructions for customizations while reading. + * Return {@link TableDefinition table definition} corresponding to this iceberg table * - * @param instructions The instructions for customizations while reading - * @return The table definition as a Deephaven table + * @param readInstructions The instructions for customizations while reading the table. + * @return The table definition */ - public Table definitionTable(@Nullable final IcebergInstructions instructions) { - return TableTools.metaTable(definition(null, instructions)); + public TableDefinition definition(@NotNull final IcebergReadInstructions readInstructions) { + final SpecAndSchema specAndSchema = getSpecAndSchema(readInstructions); + final Schema schema = specAndSchema.schema; + final PartitionSpec partitionSpec = specAndSchema.partitionSpec; + final IcebergReadInstructions updatedInstructions = specAndSchema.readInstructions; + + return fromSchema(schema, + partitionSpec, + updatedInstructions.tableDefinition().orElse(null), + getRenameColumnMap(table, schema, updatedInstructions)); } /** - * Return {@link Table table} containing the {@link TableDefinition definition} of a given Iceberg table and - * snapshot id, with optional instructions for customizations while reading. + * Return {@link Table table} containing the {@link TableDefinition definition} of this Iceberg table. * - * @param snapshotId The identifier of the snapshot to load - * @param instructions The instructions for customizations while reading (or null for default instructions) * @return The table definition as a Deephaven table */ - public Table definitionTable( - final long snapshotId, - @Nullable final IcebergInstructions instructions) { - return TableTools.metaTable(definition(snapshotId, instructions)); + public Table definitionTable() { + return definitionTable(IcebergReadInstructions.DEFAULT); } /** - * Return {@link Table table} containing the {@link TableDefinition definition} of a given Iceberg table and - * snapshot id, with optional instructions for customizations while reading. + * Return {@link Table table} containing the {@link TableDefinition definition} of this Iceberg table. * - * @param tableSnapshot The snapshot to load - * @param instructions The instructions for customizations while reading (or null for default instructions) + * @param readInstructions The instructions for customizations while reading the table. * @return The table definition as a Deephaven table */ - public Table definitionTable( - @Nullable final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { - return TableTools.metaTable(definition(tableSnapshot, instructions)); + public Table definitionTable(final IcebergReadInstructions readInstructions) { + return TableTools.metaTable(definition(readInstructions)); } /** - * Read the latest snapshot of an Iceberg table from the Iceberg catalog as a Deephaven {@link Table table}. + * Read the latest snapshot of this Iceberg table from the Iceberg catalog as a Deephaven {@link Table table}. * * @return The loaded table */ public IcebergTable table() { - return table(null); + return table(IcebergReadInstructions.DEFAULT); } /** - * Read the latest snapshot of an Iceberg table from the Iceberg catalog as a Deephaven {@link Table table}. + * Read a snapshot of this Iceberg table from the Iceberg catalog as a Deephaven {@link Table table}. * - * @param instructions The instructions for customizations while reading (or null for default instructions) + * @param readInstructions The instructions for customizations while reading the table. * @return The loaded table */ - public IcebergTable table(@Nullable final IcebergInstructions instructions) { - return table(null, instructions); - } - - /** - * Read a snapshot of an Iceberg table from the Iceberg catalog. - * - * @param tableSnapshotId The snapshot id to load - * @return The loaded table - */ - public IcebergTable table(final long tableSnapshotId) { - return table(tableSnapshotId, null); - } - - /** - * Read a snapshot of an Iceberg table from the Iceberg catalog. - * - * @param tableSnapshotId The snapshot id to load - * @param instructions The instructions for customizations while reading (or null for default instructions) - * @return The loaded table - */ - public IcebergTable table(final long tableSnapshotId, @Nullable final IcebergInstructions instructions) { - // Find the snapshot with the given snapshot id - final Snapshot tableSnapshot = - snapshot(tableSnapshotId).orElseThrow(() -> new IllegalArgumentException( - "Snapshot with id " + tableSnapshotId + " not found for table " + tableIdentifier)); - - return table(tableSnapshot, instructions); - } - - /** - * Read a snapshot of an Iceberg table from the Iceberg catalog. - * - * @param tableSnapshot The snapshot to load - * @param instructions The instructions for customizations while reading - * @return The loaded table - */ - public IcebergTable table( - @Nullable final Snapshot tableSnapshot, - @Nullable final IcebergInstructions instructions) { - - final Snapshot snapshot; - final Schema schema; - final org.apache.iceberg.PartitionSpec partitionSpec; - - if (tableSnapshot == null) { - synchronized (this) { - // Refresh only once and record the current snapshot, schema (which may be newer than the - // snapshot schema), and partition spec. - refresh(); - snapshot = table.currentSnapshot(); - schema = table.schema(); - partitionSpec = table.spec(); - } - } else { - snapshot = tableSnapshot; - // Use the schema from the snapshot - schema = schema(tableSnapshot.schemaId()).get(); - partitionSpec = table.spec(); - } - - // Get default instructions if none are provided - final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions; + public IcebergTable table(@NotNull final IcebergReadInstructions readInstructions) { + final SpecAndSchema specAndSchema = getSpecAndSchema(readInstructions); + final Schema schema = specAndSchema.schema; + final PartitionSpec partitionSpec = specAndSchema.partitionSpec; + IcebergReadInstructions updatedInstructions = specAndSchema.readInstructions; // Get the user supplied table definition. - final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null); + final TableDefinition userTableDef = updatedInstructions.tableDefinition().orElse(null); // Map all the column names in the schema to their legalized names. - final Map legalizedColumnRenames = getRenameColumnMap(table, schema, userInstructions); + final Map legalizedColumnRenames = getRenameColumnMap(table, schema, updatedInstructions); // Get the table definition from the schema (potentially limited by the user supplied table definition and // applying column renames). final TableDefinition tableDef = fromSchema(schema, partitionSpec, userTableDef, legalizedColumnRenames); // Create the final instructions with the legalized column renames. - final IcebergInstructions finalInstructions = userInstructions.withColumnRenames(legalizedColumnRenames); + updatedInstructions = updatedInstructions.withColumnRenames(legalizedColumnRenames); final IcebergBaseLayout keyFinder; if (partitionSpec.isUnpartitioned()) { // Create the flat layout location key finder - keyFinder = new IcebergFlatLayout(this, snapshot, finalInstructions, - dataInstructionsProviderLoader); + keyFinder = new IcebergFlatLayout(this, updatedInstructions, dataInstructionsProviderLoader); } else { // Create the partitioning column location key finder - keyFinder = new IcebergKeyValuePartitionedLayout(this, snapshot, partitionSpec, - finalInstructions, dataInstructionsProviderLoader); + keyFinder = new IcebergKeyValuePartitionedLayout(this, partitionSpec, updatedInstructions, + dataInstructionsProviderLoader); } - if (finalInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.STATIC) { + if (updatedInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.STATIC) { final IcebergTableLocationProviderBase locationProvider = new IcebergStaticTableLocationProvider<>( StandaloneTableKey.getInstance(), @@ -474,7 +413,7 @@ public IcebergTable table( final UpdateSourceRegistrar updateSourceRegistrar = ExecutionContext.getContext().getUpdateGraph(); final IcebergTableLocationProviderBase locationProvider; - if (finalInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.MANUAL_REFRESHING) { + if (updatedInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.MANUAL_REFRESHING) { locationProvider = new IcebergManualRefreshTableLocationProvider<>( StandaloneTableKey.getInstance(), keyFinder, @@ -487,7 +426,7 @@ public IcebergTable table( keyFinder, new IcebergTableLocationFactory(), TableDataRefreshService.getSharedRefreshService(), - finalInstructions.updateMode().autoRefreshMs(), + updatedInstructions.updateMode().autoRefreshMs(), this, tableIdentifier); } @@ -525,7 +464,7 @@ public String toString() { private Map getRenameColumnMap( @NotNull final org.apache.iceberg.Table table, @NotNull final Schema schema, - @NotNull final IcebergInstructions instructions) { + @NotNull final IcebergReadInstructions instructions) { final Set takenNames = new HashSet<>(); diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/PyIceberg1Test.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/PyIceberg1Test.java index 927089710d1..835598d1380 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/PyIceberg1Test.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/PyIceberg1Test.java @@ -10,6 +10,7 @@ import io.deephaven.engine.util.TableTools; import io.deephaven.iceberg.sqlite.DbResource; import io.deephaven.iceberg.util.IcebergCatalogAdapter; +import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.iceberg.util.IcebergTools; import org.apache.iceberg.Snapshot; @@ -83,10 +84,14 @@ void cities1() { final Table cities1; { final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(CITIES_ID); - final TableDefinition td = tableAdapter.definition(SNAPSHOT_1_ID, null); + final TableDefinition td = tableAdapter.definition(IcebergReadInstructions.builder() + .snapshotId(SNAPSHOT_1_ID) + .build()); assertThat(td).isEqualTo(CITIES_1_TD); - cities1 = tableAdapter.table(SNAPSHOT_1_ID); + cities1 = tableAdapter.table(IcebergReadInstructions.builder() + .snapshotId(SNAPSHOT_1_ID) + .build()); assertThat(cities1.getDefinition()).isEqualTo(CITIES_1_TD); } final Table expectedCities1 = TableTools.newTable(CITIES_1_TD, @@ -101,10 +106,14 @@ void cities2() { final Table cities2; { final IcebergTableAdapter tableAdapter = catalogAdapter.loadTable(CITIES_ID); - final TableDefinition td = tableAdapter.definition(SNAPSHOT_2_ID, null); + final TableDefinition td = tableAdapter.definition(IcebergReadInstructions.builder() + .snapshotId(SNAPSHOT_2_ID) + .build()); assertThat(td).isEqualTo(CITIES_2_TD); - cities2 = tableAdapter.table(SNAPSHOT_2_ID); + cities2 = tableAdapter.table(IcebergReadInstructions.builder() + .snapshotId(SNAPSHOT_2_ID) + .build()); assertThat(cities2.getDefinition()).isEqualTo(CITIES_2_TD); } // TODO(deephaven-core#6118): Iceberg column rename handling diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 41fb0af901a..1411a6b9f24 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -3,7 +3,7 @@ # """ This module adds Iceberg table support into Deephaven. """ from __future__ import annotations -from typing import List, Optional, Union, Dict, Sequence +from typing import Optional, Dict import jpy @@ -14,7 +14,7 @@ from deephaven.jcompat import j_hashmap -_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions") +_JIcebergReadInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergReadInstructions") _JIcebergUpdateMode = jpy.get_type("io.deephaven.iceberg.util.IcebergUpdateMode") _JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") _JIcebergTableAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergTableAdapter") @@ -79,19 +79,20 @@ def j_object(self) -> jpy.JType: return self._j_object -class IcebergInstructions(JObjectWrapper): +class IcebergReadInstructions(JObjectWrapper): """ This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename instructions and table definitions, as well as special data instructions for loading data files from the cloud. """ - j_object_type = _JIcebergInstructions + j_object_type = _JIcebergReadInstructions def __init__(self, table_definition: Optional[TableDefinitionLike] = None, data_instructions: Optional[s3.S3Instructions] = None, column_renames: Optional[Dict[str, str]] = None, - update_mode: Optional[IcebergUpdateMode] = None): + update_mode: Optional[IcebergUpdateMode] = None, + snapshot_id: Optional[int] = None): """ Initializes the instructions using the provided parameters. @@ -105,6 +106,7 @@ def __init__(self, the output table. update_mode (Optional[IcebergUpdateMode]): The update mode for the table. If omitted, the default update mode of :py:func:`IcebergUpdateMode.static() ` is used. + snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. Raises: DHError: If unable to build the instructions object. @@ -126,6 +128,9 @@ def __init__(self, if update_mode: builder.updateMode(update_mode.j_object) + if snapshot_id: + builder.snapshotId(snapshot_id) + self._j_object = builder.build() except Exception as e: raise DHError(e, "Failed to build Iceberg instructions") from e @@ -199,49 +204,42 @@ def snapshots(self) -> Table: """ return Table(self.j_object.snapshots()) - def definition(self, instructions: Optional[IcebergInstructions] = None, snapshot_id: Optional[int] = None) -> Table: + def definition(self, instructions: Optional[IcebergReadInstructions] = None) -> Table: """ Returns the Deephaven table definition as a Deephaven table. Args: - instructions (Optional[IcebergInstructions]): the instructions for reading the table. These instructions + instructions (Optional[IcebergReadInstructions]): the instructions for reading the table. These instructions can include column renames, table definition, and specific data instructions for reading the data files from the provider. If omitted, the table will be read with default instructions. - snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. Returns: a table containing the table definition. """ - if instructions: - instructions = instructions.j_object + if instructions is not None: + return Table(self.j_object.definitionTable(instructions.j_object)) + return Table(self.j_object.definitionTable()) - if snapshot_id is not None: - return Table(self.j_object.definitionTable(snapshot_id, instructions)) - return Table(self.j_object.definitionTable(instructions)) - - def table(self, instructions: Optional[IcebergInstructions] = None, snapshot_id: Optional[int] = None) -> IcebergTable: + def table(self, instructions: Optional[IcebergReadInstructions] = None) -> IcebergTable: """ Reads the table using the provided instructions. Optionally, a snapshot id can be provided to read a specific snapshot of the table. Args: - instructions (Optional[IcebergInstructions]): the instructions for reading the table. These instructions + instructions (Optional[IcebergReadInstructions]): the instructions for reading the table. These instructions can include column renames, table definition, and specific data instructions for reading the data files from the provider. If omitted, the table will be read in `static()` mode without column renames or data instructions. - snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. Returns: Table: the table read from the catalog. """ - if instructions: - instructions = instructions.j_object + if instructions is not None: + return IcebergTable(self.j_object.table(instructions.j_object)) + return IcebergTable(self.j_object.table()) - if snapshot_id: - return IcebergTable(self.j_object.table(snapshot_id, instructions)) - return IcebergTable(self.j_object.table(instructions)) @property def j_object(self) -> jpy.JType: diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py index 8934299b74d..a3dcb72ac2f 100644 --- a/py/server/tests/test_iceberg.py +++ b/py/server/tests/test_iceberg.py @@ -23,14 +23,14 @@ def tearDown(self): super().tearDown() def test_instruction_create_empty(self): - iceberg_instructions = iceberg.IcebergInstructions() + iceberg_read_instructions = iceberg.IcebergReadInstructions() def test_instruction_create_with_s3_instructions(self): s3_instructions = s3.S3Instructions(region_name="us-east-1", access_key_id="some_access_key_id", secret_access_key="som_secret_access_key" ) - iceberg_instructions = iceberg.IcebergInstructions(data_instructions=s3_instructions) + iceberg_read_instructions = iceberg.IcebergReadInstructions(data_instructions=s3_instructions) def test_instruction_create_with_col_renames(self): renames = { @@ -38,9 +38,9 @@ def test_instruction_create_with_col_renames(self): "old_name_b": "new_name_b", "old_name_c": "new_name_c" } - iceberg_instructions = iceberg.IcebergInstructions(column_renames=renames) + iceberg_read_instructions = iceberg.IcebergReadInstructions(column_renames=renames) - col_rename_dict = j_map_to_dict(iceberg_instructions.j_object.columnRenames()) + col_rename_dict = j_map_to_dict(iceberg_read_instructions.j_object.columnRenames()) self.assertTrue(col_rename_dict["old_name_a"] == "new_name_a") self.assertTrue(col_rename_dict["old_name_b"] == "new_name_b") self.assertTrue(col_rename_dict["old_name_c"] == "new_name_c") @@ -52,8 +52,8 @@ def test_instruction_create_with_table_definition_dict(self): "z": dtypes.double, } - iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) - col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) + iceberg_read_instructions = iceberg.IcebergReadInstructions(table_definition=table_def) + col_names = j_list_to_list(iceberg_read_instructions.j_object.tableDefinition().get().getColumnNames()) self.assertTrue(col_names[0] == "x") self.assertTrue(col_names[1] == "y") self.assertTrue(col_names[2] == "z") @@ -66,9 +66,13 @@ def test_instruction_create_with_table_definition_list(self): col_def("z", dtypes.double), ] - iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) - col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) + iceberg_read_instructions = iceberg.IcebergReadInstructions(table_definition=table_def) + col_names = j_list_to_list(iceberg_read_instructions.j_object.tableDefinition().get().getColumnNames()) self.assertTrue(col_names[0] == "Partition") self.assertTrue(col_names[1] == "x") self.assertTrue(col_names[2] == "y") self.assertTrue(col_names[3] == "z") + + def test_instruction_create_with_snapshot_id(self): + iceberg_read_instructions = iceberg.IcebergReadInstructions(snapshot_id=12345) + self.assertTrue(iceberg_read_instructions.j_object.snapshotId().getAsLong() == 12345)
Column NameDescription