From a13a0dc535398a948757c1ccc36d5c5115c9dc7c Mon Sep 17 00:00:00 2001 From: Larry Booker Date: Mon, 24 Jun 2024 12:51:21 -0700 Subject: [PATCH] feat: expose Iceberg features to python users (#5590) * WIP commit, functional but needs docs. * Added AWS Glue functionality. * Update Iceberg and AWS versions, itemize aws dependencies. * Documentation improvements. --- buildSrc/src/main/groovy/Classpaths.groovy | 4 +- extensions/iceberg/s3/build.gradle | 4 +- .../iceberg/util/IcebergToolsS3.java | 48 +++- .../iceberg/util/IcebergToolsTest.java | 61 ++++- .../iceberg/util/IcebergCatalogAdapter.java | 108 ++++++-- .../iceberg/util/IcebergInstructions.java | 7 + py/server/deephaven/experimental/iceberg.py | 252 ++++++++++++++++++ py/server/deephaven/jcompat.py | 31 +++ py/server/deephaven/parquet.py | 22 +- py/server/deephaven/stream/table_publisher.py | 13 +- py/server/tests/test_iceberg.py | 76 ++++++ 11 files changed, 562 insertions(+), 64 deletions(-) create mode 100644 py/server/deephaven/experimental/iceberg.py create mode 100644 py/server/tests/test_iceberg.py diff --git a/buildSrc/src/main/groovy/Classpaths.groovy b/buildSrc/src/main/groovy/Classpaths.groovy index 86c342bb0c7..df2877db0f0 100644 --- a/buildSrc/src/main/groovy/Classpaths.groovy +++ b/buildSrc/src/main/groovy/Classpaths.groovy @@ -126,10 +126,10 @@ class Classpaths { static final String HADOOP_VERSION = '3.4.0' static final String ICEBERG_GROUP = 'org.apache.iceberg' - static final String ICEBERG_VERSION = '1.5.0' + static final String ICEBERG_VERSION = '1.5.2' static final String AWSSDK_GROUP = 'software.amazon.awssdk' - static final String AWSSDK_VERSION = '2.23.19' + static final String AWSSDK_VERSION = '2.24.5' static final String TESTCONTAINER_GROUP = 'org.testcontainers' static final String TESTCONTAINER_VERSION = '1.19.4' diff --git a/extensions/iceberg/s3/build.gradle b/extensions/iceberg/s3/build.gradle index 359651ec7e6..be495b1373d 100644 --- a/extensions/iceberg/s3/build.gradle +++ b/extensions/iceberg/s3/build.gradle @@ -16,8 +16,10 @@ dependencies { implementation project(':extensions-s3') implementation "org.apache.iceberg:iceberg-aws" - runtimeOnly "org.apache.iceberg:iceberg-aws-bundle" + Classpaths.inheritAWSSDK(project) + runtimeOnly "software.amazon.awssdk:sts" + runtimeOnly "software.amazon.awssdk:glue" Classpaths.inheritTestContainers(project) diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java index 6f7845c43eb..166b47e5d28 100644 --- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/IcebergToolsS3.java @@ -7,6 +7,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.rest.RESTCatalog; @@ -23,6 +24,20 @@ public class IcebergToolsS3 extends IcebergTools { private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO"; + /** + * Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a + * value, the system defaults will be used. + * + * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name + * @param catalogURI the URI of the Iceberg REST catalog + * @param warehouseLocation the location of the S3 datafiles backing the catalog + * @param region the AWS region; if omitted, system defaults will be used + * @param accessKeyId the AWS access key ID; if omitted, system defaults will be used + * @param secretAccessKey the AWS secret access key; if omitted, system defaults will be used + * @param endpointOverride the S3 endpoint override; this is useful for testing with a S3-compatible local service + * such as MinIO or LocalStack + * @return the Iceberg catalog adapter + */ public static IcebergCatalogAdapter createS3Rest( @Nullable final String name, @NotNull final String catalogURI, @@ -53,7 +68,6 @@ public static IcebergCatalogAdapter createS3Rest( properties.put(S3FileIOProperties.ENDPOINT, endpointOverride); } - // TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null); final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; @@ -62,4 +76,36 @@ public static IcebergCatalogAdapter createS3Rest( return new IcebergCatalogAdapter(catalog, fileIO); } + /** + * Create an Iceberg catalog adapter for an AWS Glue catalog. System defaults will be used to populate the region + * and credentials. These can be configured by following + * AWS Authentication and + * access credentials guide. + * + * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name + * @param catalogURI the URI of the AWS Glue catalog + * @param warehouseLocation the location of the S3 datafiles backing the catalog + * @return the Iceberg catalog adapter + */ + public static IcebergCatalogAdapter createGlue( + @Nullable final String name, + @NotNull final String catalogURI, + @NotNull final String warehouseLocation) { + + // Set up the properties map for the Iceberg catalog + final Map properties = new HashMap<>(); + + final GlueCatalog catalog = new GlueCatalog(); + + properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName()); + properties.put(CatalogProperties.URI, catalogURI); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation); + + final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null); + + final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI; + catalog.initialize(catalogName, properties); + + return new IcebergCatalogAdapter(catalog, fileIO); + } } diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java index 0fd3b3fcf7e..7544976f27b 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergToolsTest.java @@ -129,19 +129,23 @@ public void testListTables() { final Namespace ns = Namespace.of("sales"); - final Collection tables = adapter.listTables(ns); + Collection tables = adapter.listTables(ns); Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")), "tables.contains(sales_partitioned)"); Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)"); - final Table table = adapter.listTablesAsTable(ns); + Table table = adapter.listTablesAsTable(ns); Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type"); Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type"); Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class), "table_identifier_object column type"); + + // Test the string versions of the methods + table = adapter.listTablesAsTable("sales"); + Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace"); } @Test @@ -160,7 +164,7 @@ public void testListSnapshots() { Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)"); Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)"); - final Table table = adapter.listSnapshotsAsTable(tableIdentifier); + Table table = adapter.listSnapshotsAsTable(tableIdentifier); Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type"); Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type"); @@ -168,6 +172,10 @@ public void testListSnapshots() { Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type"); Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class), "snapshot_object column type"); + + // Test the string versions of the methods + table = adapter.listSnapshotsAsTable("sales.sales_multi"); + Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi"); } @Test @@ -180,7 +188,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned"); - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + table = adapter.readTable("sales.sales_partitioned", instructions); // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); @@ -196,9 +210,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi"); - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + table = adapter.readTable("sales.sales_multi", instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); } @Test @@ -211,7 +231,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti final Namespace ns = Namespace.of("sales"); final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single"); - final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions); + + // Verify we retrieved all the rows. + Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + table = adapter.readTable("sales.sales_single", instructions); // Verify we retrieved all the rows. Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table"); @@ -563,16 +589,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx final List snapshots = adapter.listSnapshots(tableId); // Verify we retrieved all the rows. - final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions); + io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions); + Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); + + io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions); + Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); + + io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions); + Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); + + io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions); + Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); + + // Test the string versions of the methods + + // Verify we retrieved all the rows. + table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions); Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table"); - final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions); + table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions); Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table"); - final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions); + table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions); Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table"); - final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions); + table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions); Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table"); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java index c379c715c6d..c54535599c3 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergCatalogAdapter.java @@ -55,7 +55,7 @@ public class IcebergCatalogAdapter { /** * Create a single {@link TableDefinition} from a given Schema, PartitionSpec, and TableDefinition. Takes into - * account {@link Map column rename instructions} + * account {@link Map<> column rename instructions} * * @param schema The schema of the table. * @param partitionSpec The partition specification of the table. @@ -197,11 +197,11 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) { // Create the column source(s) final String[] namespaceArr = new String[(int) size]; - columnSourceMap.put("namespace", + columnSourceMap.put("Namespace", InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null)); final Namespace[] namespaceObjectArr = new Namespace[(int) size]; - columnSourceMap.put("namespace_object", + columnSourceMap.put("NamespaceObject", InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceObjectArr, Namespace.class, null)); // Populate the column source arrays @@ -215,6 +215,16 @@ public Table listNamespacesAsTable(@NotNull final Namespace namespace) { return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); } + /** + * List all {@link Namespace namespaces} in a given namespace as a Deephaven {@link Table table}. The resulting + * table will be static and contain the same information as {@link #listNamespaces(Namespace)}. + * + * @return A {@link Table table} of all namespaces. + */ + public Table listNamespacesAsTable(@NotNull final String... namespace) { + return listNamespacesAsTable(Namespace.of(namespace)); + } + /** * List all Iceberg {@link TableIdentifier tables} in a given namespace. * @@ -241,15 +251,15 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) { // Create the column source(s) final String[] namespaceArr = new String[(int) size]; - columnSourceMap.put("namespace", + columnSourceMap.put("Namespace", InMemoryColumnSource.getImmutableMemoryColumnSource(namespaceArr, String.class, null)); final String[] tableNameArr = new String[(int) size]; - columnSourceMap.put("table_name", + columnSourceMap.put("TableName", InMemoryColumnSource.getImmutableMemoryColumnSource(tableNameArr, String.class, null)); final TableIdentifier[] tableIdentifierArr = new TableIdentifier[(int) size]; - columnSourceMap.put("table_identifier_object", + columnSourceMap.put("TableIdentifierObject", InMemoryColumnSource.getImmutableMemoryColumnSource(tableIdentifierArr, TableIdentifier.class, null)); // Populate the column source arrays @@ -264,6 +274,10 @@ public Table listTablesAsTable(@NotNull final Namespace namespace) { return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); } + public Table listTablesAsTable(@NotNull final String... namespace) { + return listTablesAsTable(Namespace.of(namespace)); + } + /** * List all {@link Snapshot snapshots} of a given Iceberg table. * @@ -292,22 +306,22 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier // Create the column source(s) final long[] idArr = new long[(int) size]; - columnSourceMap.put("id", InMemoryColumnSource.getImmutableMemoryColumnSource(idArr, long.class, null)); + columnSourceMap.put("Id", InMemoryColumnSource.getImmutableMemoryColumnSource(idArr, long.class, null)); final long[] timestampArr = new long[(int) size]; - columnSourceMap.put("timestamp_ms", + columnSourceMap.put("TimestampMs", InMemoryColumnSource.getImmutableMemoryColumnSource(timestampArr, long.class, null)); final String[] operatorArr = new String[(int) size]; - columnSourceMap.put("operation", + columnSourceMap.put("Operation", InMemoryColumnSource.getImmutableMemoryColumnSource(operatorArr, String.class, null)); final Map[] summaryArr = new Map[(int) size]; - columnSourceMap.put("summary", + columnSourceMap.put("Summary", InMemoryColumnSource.getImmutableMemoryColumnSource(summaryArr, Map.class, null)); final Snapshot[] snapshotArr = new Snapshot[(int) size]; - columnSourceMap.put("snapshot_object", + columnSourceMap.put("SnapshotObject", InMemoryColumnSource.getImmutableMemoryColumnSource(snapshotArr, Snapshot.class, null)); // Populate the column source(s) @@ -324,6 +338,17 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier return new QueryTable(RowSetFactory.flat(size).toTracking(), columnSourceMap); } + /** + * List all {@link Snapshot snapshots} of a given Iceberg table as a Deephaven {@link Table table}. The resulting + * table will be static and contain the same information as {@link #listSnapshots(TableIdentifier)}. + * + * @param tableIdentifier The identifier of the table from which to gather snapshots. + * @return A list of all tables in the given namespace. + */ + public Table listSnapshotsAsTable(@NotNull final String tableIdentifier) { + return listSnapshotsAsTable(TableIdentifier.parse(tableIdentifier)); + } + /** * Read the latest static snapshot of an Iceberg table from the Iceberg catalog. * @@ -334,10 +359,24 @@ public Table listSnapshotsAsTable(@NotNull final TableIdentifier tableIdentifier @SuppressWarnings("unused") public Table readTable( @NotNull final TableIdentifier tableIdentifier, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { return readTableInternal(tableIdentifier, null, instructions); } + /** + * Read the latest static snapshot of an Iceberg table from the Iceberg catalog. + * + * @param tableIdentifier The table identifier to load + * @param instructions The instructions for customizations while reading + * @return The loaded table + */ + @SuppressWarnings("unused") + public Table readTable( + @NotNull final String tableIdentifier, + @Nullable final IcebergInstructions instructions) { + return readTable(TableIdentifier.parse(tableIdentifier), instructions); + } + /** * Read a static snapshot of an Iceberg table from the Iceberg catalog. * @@ -350,7 +389,7 @@ public Table readTable( public Table readTable( @NotNull final TableIdentifier tableIdentifier, final long tableSnapshotId, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { // Find the snapshot with the given snapshot id final Snapshot tableSnapshot = listSnapshots(tableIdentifier).stream() @@ -361,6 +400,22 @@ public Table readTable( return readTableInternal(tableIdentifier, tableSnapshot, instructions); } + /** + * Read a static snapshot of an Iceberg table from the Iceberg catalog. + * + * @param tableIdentifier The table identifier to load + * @param tableSnapshotId The snapshot id to load + * @param instructions The instructions for customizations while reading + * @return The loaded table + */ + @SuppressWarnings("unused") + public Table readTable( + @NotNull final String tableIdentifier, + final long tableSnapshotId, + @Nullable final IcebergInstructions instructions) { + return readTable(TableIdentifier.parse(tableIdentifier), tableSnapshotId, instructions); + } + /** * Read a static snapshot of an Iceberg table from the Iceberg catalog. * @@ -373,32 +428,35 @@ public Table readTable( public Table readTable( @NotNull final TableIdentifier tableIdentifier, @NotNull final Snapshot tableSnapshot, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { return readTableInternal(tableIdentifier, tableSnapshot, instructions); } private Table readTableInternal( @NotNull final TableIdentifier tableIdentifier, @Nullable final Snapshot tableSnapshot, - @NotNull final IcebergInstructions instructions) { + @Nullable final IcebergInstructions instructions) { - // Load the table from the catalog + // Load the table from the catalog. final org.apache.iceberg.Table table = catalog.loadTable(tableIdentifier); // Do we want the latest or a specific snapshot? final Snapshot snapshot = tableSnapshot != null ? tableSnapshot : table.currentSnapshot(); final Schema schema = table.schemas().get(snapshot.schemaId()); - // Load the partitioning schema + // Load the partitioning schema. final org.apache.iceberg.PartitionSpec partitionSpec = table.spec(); + // Get default instructions if none are provided + final IcebergInstructions userInstructions = instructions == null ? IcebergInstructions.DEFAULT : instructions; + // Get the user supplied table definition. - final TableDefinition userTableDef = instructions.tableDefinition().orElse(null); + final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null); // Get the table definition from the schema (potentially limited by the user supplied table definition and // applying column renames). final TableDefinition icebergTableDef = - fromSchema(schema, partitionSpec, userTableDef, instructions.columnRenames()); + fromSchema(schema, partitionSpec, userTableDef, userInstructions.columnRenames()); // If the user supplied a table definition, make sure it's fully compatible. final TableDefinition tableDef; @@ -433,11 +491,11 @@ private Table readTableInternal( if (partitionSpec.isUnpartitioned()) { // Create the flat layout location key finder - keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, instructions); + keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, userInstructions); } else { // Create the partitioning column location key finder keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, fileIO, partitionSpec, - instructions); + userInstructions); } refreshService = null; @@ -459,4 +517,12 @@ private Table readTableInternal( return result; } + + /** + * Returns the underlying Iceberg {@link Catalog catalog} used by this adapter. + */ + @SuppressWarnings("unused") + public Catalog catalog() { + return catalog; + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java index 4788e0e8714..b595b4cfd14 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergInstructions.java @@ -17,6 +17,13 @@ @Immutable @BuildableStyle public abstract class IcebergInstructions { + /** + * The default {@link IcebergInstructions} to use when reading Iceberg data files. Providing this will use system + * defaults for cloud provider-specific parameters + */ + @SuppressWarnings("unused") + public static final IcebergInstructions DEFAULT = builder().build(); + public static Builder builder() { return ImmutableIcebergInstructions.builder(); } diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py new file mode 100644 index 00000000000..7506bc95a25 --- /dev/null +++ b/py/server/deephaven/experimental/iceberg.py @@ -0,0 +1,252 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +""" This module adds Iceberg table support into Deephaven. """ +from typing import List, Optional, Union, Dict, Sequence + +import jpy + +from deephaven import DHError +from deephaven._wrapper import JObjectWrapper +from deephaven.column import Column +from deephaven.dtypes import DType +from deephaven.experimental import s3 + +from deephaven.jcompat import j_table_definition + +from deephaven.table import Table + +_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions") +_JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") + +# IcebergToolsS3 is an optional library +try: + _JIcebergToolsS3 = jpy.get_type("io.deephaven.iceberg.util.IcebergToolsS3") +except Exception: + _JIcebergToolsS3 = None + +_JNamespace = jpy.get_type("org.apache.iceberg.catalog.Namespace") +_JTableIdentifier = jpy.get_type("org.apache.iceberg.catalog.TableIdentifier") +_JSnapshot = jpy.get_type("org.apache.iceberg.Snapshot") + + +class IcebergInstructions(JObjectWrapper): + """ + This class specifies the instructions for reading an Iceberg table into Deephaven. These include column rename + instructions and table definitions, as well as special data instructions for loading data files from the cloud. + """ + + j_object_type = _JIcebergInstructions + + def __init__(self, + table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None, + data_instructions: Optional[s3.S3Instructions] = None, + column_renames: Optional[Dict[str, str]] = None): + """ + Initializes the instructions using the provided parameters. + + Args: + table_definition (Optional[Union[Dict[str, DType], List[Column], None]]): the table definition; if omitted, + the definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table + will have that definition. This is useful for specifying a subset of the Iceberg schema columns. + data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when + reading files from a non-local file system, like S3. + column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in + the output table. + + Raises: + DHError: If unable to build the instructions object. + """ + + try: + builder = self.j_object_type.builder() + + if table_definition is not None: + builder.tableDefinition(j_table_definition(table_definition)) + + if data_instructions is not None: + builder.dataInstructions(data_instructions.j_object) + + if column_renames is not None: + for old_name, new_name in column_renames.items(): + builder.putColumnRenames(old_name, new_name) + + self._j_object = builder.build() + except Exception as e: + raise DHError(e, "Failed to build Iceberg instructions") from e + + @property + def j_object(self) -> jpy.JType: + return self._j_object + + +class IcebergCatalogAdapter(JObjectWrapper): + """ + This class provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, tables and + snapshots, as well as reading Iceberg tables into Deephaven tables. + """ + j_object_type = _JIcebergCatalogAdapter or type(None) + + def __init__(self, j_object: _JIcebergCatalogAdapter): + self.j_catalog_adapter = j_object + + def namespaces(self, namespace: Optional[str] = None) -> Table: + """ + Returns information on the namespaces in the catalog as a Deephaven table. If a namespace is specified, the + tables in that namespace are listed; otherwise the top-level namespaces are listed. + + Args: + namespace (Optional[str]): the higher-level namespace from which to list namespaces; if omitted, the + top-level namespaces are listed. + + Returns: + a table containing the namespaces. + """ + + if namespace is not None: + return Table(self.j_object.listNamespaces(namespace)) + return Table(self.j_object.listNamespacesAsTable()) + + def tables(self, namespace: str) -> Table: + """ + Returns information on the tables in the specified namespace as a Deephaven table. + + Args: + namespace (str): the namespace from which to list tables. + + Returns: + a table containing the tables in the provided namespace. + """ + + if namespace is not None: + return Table(self.j_object.listTablesAsTable(namespace)) + return Table(self.j_object.listTablesAsTable()) + + def snapshots(self, table_identifier: str) -> Table: + """ + Returns information on the snapshots of the specified table as a Deephaven table. + + Args: + table_identifier (str): the table from which to list snapshots. + + Returns: + a table containing the snapshot information. + """ + + return self.j_object.listSnapshotsAsTable(table_identifier) + + def read_table(self, table_identifier: str, instructions: Optional[IcebergInstructions] = None, snapshot_id: Optional[int] = None) -> Table: + """ + Reads the table from the catalog using the provided instructions. Optionally, a snapshot id can be provided to + read a specific snapshot of the table. + + Args: + table_identifier (str): the table to read. + instructions (Optional[IcebergInstructions]): the instructions for reading the table. These instructions + can include column renames, table definition, and specific data instructions for reading the data files + from the provider. If omitted, the table will be read with default instructions. + snapshot_id (Optional[int]): the snapshot id to read; if omitted the most recent snapshot will be selected. + + Returns: + Table: the table read from the catalog. + """ + + if instructions is not None: + instructions_object = instructions.j_object + else: + instructions_object = _JIcebergInstructions.DEFAULT + + if snapshot_id is not None: + return Table(self.j_object.readTable(table_identifier, snapshot_id, instructions_object)) + return Table(self.j_object.readTable(table_identifier, instructions_object)) + + @property + def j_object(self) -> jpy.JType: + return self.j_catalog_adapter + + +def adapter_s3_rest( + catalog_uri: str, + warehouse_location: str, + name: Optional[str] = None, + region_name: Optional[str] = None, + access_key_id: Optional[str] = None, + secret_access_key: Optional[str] = None, + end_point_override: Optional[str] = None +) -> IcebergCatalogAdapter: + """ + Create a catalog adapter using an S3-compatible provider and a REST catalog. + + Args: + catalog_uri (str): the URI of the REST catalog. + warehouse_location (str): the location of the warehouse. + name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the + catalog URI. + region_name (Optional[str]): the S3 region name to use; If not provided, the default region will be + picked by the AWS SDK from 'aws.region' system property, "AWS_REGION" environment variable, the + {user.home}/.aws/credentials or {user.home}/.aws/config files, or from EC2 metadata service, if running in + EC2. + access_key_id (Optional[str]): the access key for reading files. Both access key and secret access key must be + provided to use static credentials, else default credentials will be used. + secret_access_key (Optional[str]): the secret access key for reading files. Both access key and secret key + must be provided to use static credentials, else default credentials will be used. + end_point_override (Optional[str]): the S3 endpoint to connect to. Callers connecting to AWS do not typically + need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs. + + Returns: + IcebergCatalogAdapter: the catalog adapter for the provided S3 REST catalog. + + Raises: + DHError: If unable to build the catalog adapter. + """ + if not _JIcebergToolsS3: + raise DHError(message="`adapter_s3_rest` requires the Iceberg specific deephaven S3 extensions to be " + "included in the package") + + try: + return IcebergCatalogAdapter( + _JIcebergToolsS3.createS3Rest( + name, + catalog_uri, + warehouse_location, + region_name, + access_key_id, + secret_access_key, + end_point_override)) + except Exception as e: + raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e + + +def adapter_aws_glue( + catalog_uri: str, + warehouse_location: str, + name: Optional[str] = None +) -> IcebergCatalogAdapter: + """ + Create a catalog adapter using an AWS Glue catalog. + + Args: + catalog_uri (str): the URI of the REST catalog. + warehouse_location (str): the location of the warehouse. + name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the + catalog URI. + + Returns: + IcebergCatalogAdapter: the catalog adapter for the provided AWS Glue catalog. + + Raises: + DHError: If unable to build the catalog adapter. + """ + if not _JIcebergToolsS3: + raise DHError(message="`adapter_aws_glue` requires the Iceberg specific deephaven S3 extensions to " + "be included in the package") + + try: + return IcebergCatalogAdapter( + _JIcebergToolsS3.createGlue( + name, + catalog_uri, + warehouse_location)) + except Exception as e: + raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e + diff --git a/py/server/deephaven/jcompat.py b/py/server/deephaven/jcompat.py index e6e921fd8f1..d807cb472f3 100644 --- a/py/server/deephaven/jcompat.py +++ b/py/server/deephaven/jcompat.py @@ -14,9 +14,11 @@ from deephaven import dtypes, DHError from deephaven._wrapper import unwrap, wrap_j_object, JObjectWrapper from deephaven.dtypes import DType, _PRIMITIVE_DTYPE_NULL_MAP +from deephaven.column import Column _NULL_BOOLEAN_AS_BYTE = jpy.get_type("io.deephaven.util.BooleanUtils").NULL_BOOLEAN_AS_BYTE _JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility") +_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") _DH_PANDAS_NULLABLE_TYPE_MAP: Dict[DType, pd.api.extensions.ExtensionDtype] = { dtypes.bool_: pd.BooleanDtype, @@ -325,6 +327,35 @@ def _j_array_to_series(dtype: DType, j_array: jpy.JType, conv_null: bool) -> pd. return s +def j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]: + """Produce a Deephaven TableDefinition from user input. + + Args: + table_definition (Union[Dict[str, DType], List[Column], None]): the table definition as a dictionary of column + names and their corresponding data types or a list of Column objects + + Returns: + a Deephaven TableDefinition object or None if the input is None + + Raises: + DHError + """ + if table_definition is None: + return None + elif isinstance(table_definition, Dict): + return _JTableDefinition.of( + [ + Column(name=name, data_type=dtype).j_column_definition + for name, dtype in table_definition.items() + ] + ) + elif isinstance(table_definition, List): + return _JTableDefinition.of( + [col.j_column_definition for col in table_definition] + ) + else: + raise DHError(f"Unexpected table_definition type: {type(table_definition)}") + class AutoCloseable(JObjectWrapper): """A context manager wrapper to allow Java AutoCloseable to be used in with statements. diff --git a/py/server/deephaven/parquet.py b/py/server/deephaven/parquet.py index 76c9dd86735..dc877660671 100644 --- a/py/server/deephaven/parquet.py +++ b/py/server/deephaven/parquet.py @@ -13,7 +13,7 @@ from deephaven import DHError from deephaven.column import Column from deephaven.dtypes import DType -from deephaven.jcompat import j_array_list +from deephaven.jcompat import j_array_list, j_table_definition from deephaven.table import Table, PartitionedTable from deephaven.experimental import s3 @@ -135,7 +135,7 @@ def _build_parquet_instructions( builder.setFileLayout(_j_file_layout(file_layout)) if table_definition is not None: - builder.setTableDefinition(_j_table_definition(table_definition)) + builder.setTableDefinition(j_table_definition(table_definition)) if index_columns: builder.addAllIndexColumns(_j_list_of_list_of_string(index_columns)) @@ -146,24 +146,6 @@ def _build_parquet_instructions( return builder.build() -def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]: - if table_definition is None: - return None - elif isinstance(table_definition, Dict): - return _JTableDefinition.of( - [ - Column(name=name, data_type=dtype).j_column_definition - for name, dtype in table_definition.items() - ] - ) - elif isinstance(table_definition, List): - return _JTableDefinition.of( - [col.j_column_definition for col in table_definition] - ) - else: - raise DHError(f"Unexpected table_definition type: {type(table_definition)}") - - def _j_file_layout(file_layout: Optional[ParquetFileLayout]) -> Optional[jpy.JType]: if file_layout is None: return None diff --git a/py/server/deephaven/stream/table_publisher.py b/py/server/deephaven/stream/table_publisher.py index cb0d1073de8..a6c65f47885 100644 --- a/py/server/deephaven/stream/table_publisher.py +++ b/py/server/deephaven/stream/table_publisher.py @@ -5,13 +5,13 @@ import jpy -from typing import Callable, Dict, Optional, Tuple +from typing import Callable, Dict, Optional, Tuple, Union, List from deephaven._wrapper import JObjectWrapper from deephaven.column import Column from deephaven.dtypes import DType from deephaven.execution_context import get_exec_ctx -from deephaven.jcompat import j_lambda, j_runnable +from deephaven.jcompat import j_lambda, j_runnable, j_table_definition from deephaven.table import Table from deephaven.update_graph import UpdateGraph @@ -75,7 +75,7 @@ def publish_failure(self, failure: Exception) -> None: def table_publisher( name: str, - col_defs: Dict[str, DType], + col_defs: Union[Dict[str, DType], List[Column]], on_flush_callback: Optional[Callable[[TablePublisher], None]] = None, on_shutdown_callback: Optional[Callable[[], None]] = None, update_graph: Optional[UpdateGraph] = None, @@ -107,12 +107,7 @@ def adapt_callback(_table_publisher: jpy.JType): j_table_publisher = _JTablePublisher.of( name, - _JTableDefinition.of( - [ - Column(name=name, data_type=dtype).j_column_definition - for name, dtype in col_defs.items() - ] - ), + j_table_definition(col_defs), j_lambda(adapt_callback, _JConsumer, None) if on_flush_callback else None, j_runnable(on_shutdown_callback) if on_shutdown_callback else None, (update_graph or get_exec_ctx().update_graph).j_update_graph, diff --git a/py/server/tests/test_iceberg.py b/py/server/tests/test_iceberg.py new file mode 100644 index 00000000000..62ba31e6636 --- /dev/null +++ b/py/server/tests/test_iceberg.py @@ -0,0 +1,76 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +import jpy + +from deephaven import dtypes +from deephaven.column import Column, ColumnType + +from tests.testbase import BaseTestCase +from deephaven.experimental import s3, iceberg + +from deephaven.jcompat import j_map_to_dict, j_list_to_list + +_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") + +class IcebergTestCase(BaseTestCase): + """ Test cases for the deephaven.iceberg module (performed locally) """ + + def setUp(self): + super().setUp() + + def tearDown(self): + super().tearDown() + + def test_instruction_create_empty(self): + iceberg_instructions = iceberg.IcebergInstructions() + + def test_instruction_create_with_s3_instructions(self): + s3_instructions = s3.S3Instructions(region_name="us-east-1", + access_key_id="some_access_key_id", + secret_access_key="som_secret_access_key" + ) + iceberg_instructions = iceberg.IcebergInstructions(data_instructions=s3_instructions) + + def test_instruction_create_with_col_renames(self): + renames = { + "old_name_a": "new_name_a", + "old_name_b": "new_name_b", + "old_name_c": "new_name_c" + } + iceberg_instructions = iceberg.IcebergInstructions(column_renames=renames) + + col_rename_dict = j_map_to_dict(iceberg_instructions.j_object.columnRenames()) + self.assertTrue(col_rename_dict["old_name_a"] == "new_name_a") + self.assertTrue(col_rename_dict["old_name_b"] == "new_name_b") + self.assertTrue(col_rename_dict["old_name_c"] == "new_name_c") + + def test_instruction_create_with_table_definition_dict(self): + table_def={ + "x": dtypes.int32, + "y": dtypes.double, + "z": dtypes.double, + } + + iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) + col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) + self.assertTrue(col_names[0] == "x") + self.assertTrue(col_names[1] == "y") + self.assertTrue(col_names[2] == "z") + + def test_instruction_create_with_table_definition_list(self): + table_def=[ + Column( + "Partition", dtypes.int32, column_type=ColumnType.PARTITIONING + ), + Column("x", dtypes.int32), + Column("y", dtypes.double), + Column("z", dtypes.double), + ] + + iceberg_instructions = iceberg.IcebergInstructions(table_definition=table_def) + col_names = j_list_to_list(iceberg_instructions.j_object.tableDefinition().get().getColumnNames()) + self.assertTrue(col_names[0] == "Partition") + self.assertTrue(col_names[1] == "x") + self.assertTrue(col_names[2] == "y") + self.assertTrue(col_names[3] == "z")