properties) {
this.catalog = catalog;
- this.fileIO = fileIO;
+
+ dataInstructionsProvider = DataInstructionsProviderLoader.create(Map.copyOf(properties));
}
/**
@@ -648,6 +657,54 @@ public Table readTable(
return readTable(TableIdentifier.parse(tableIdentifier), instructions);
}
+ /**
+ * Retrieve a snapshot of an Iceberg table from the Iceberg catalog.
+ *
+ * @param tableIdentifier The table identifier to load
+ * @param tableSnapshotId The snapshot id to load
+ * @return The loaded table
+ * @throws IllegalArgumentException if the snapshot with the given id is not found
+ */
+ private Snapshot getTableSnapshot(@NotNull TableIdentifier tableIdentifier, long tableSnapshotId) {
+ return listSnapshots(tableIdentifier).stream()
+ .filter(snapshot -> snapshot.snapshotId() == tableSnapshotId)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(
+ "Snapshot with id " + tableSnapshotId + " for table " + tableIdentifier + " not found"));
+ }
+
+ /**
+ * Read a static snapshot of an Iceberg table from the Iceberg catalog.
+ *
+ * @param tableIdentifier The table identifier to load
+ * @param tableSnapshotId The snapshot id to load
+ * @return The loaded table
+ */
+ @SuppressWarnings("unused")
+ public Table readTable(@NotNull final TableIdentifier tableIdentifier, final long tableSnapshotId) {
+ // Find the snapshot with the given snapshot id
+ final Snapshot tableSnapshot = getTableSnapshot(tableIdentifier, tableSnapshotId);
+
+ return readTableInternal(tableIdentifier, tableSnapshot, null);
+ }
+
+
+ /**
+ * Read a static snapshot of an Iceberg table from the Iceberg catalog.
+ *
+ * @param tableIdentifier The table identifier to load
+ * @param tableSnapshotId The snapshot id to load
+ * @return The loaded table
+ */
+ @SuppressWarnings("unused")
+ public Table readTable(@NotNull final String tableIdentifier, final long tableSnapshotId) {
+ final TableIdentifier tableId = TableIdentifier.parse(tableIdentifier);
+ // Find the snapshot with the given snapshot id
+ final Snapshot tableSnapshot = getTableSnapshot(tableId, tableSnapshotId);
+
+ return readTableInternal(tableId, tableSnapshot, null);
+ }
+
/**
* Read a static snapshot of an Iceberg table from the Iceberg catalog.
*
@@ -738,11 +795,12 @@ private Table readTableInternal(
if (partitionSpec.isUnpartitioned()) {
// Create the flat layout location key finder
- keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, fileIO, userInstructions);
+ keyFinder = new IcebergFlatLayout(tableDef, table, snapshot, table.io(), userInstructions,
+ dataInstructionsProvider);
} else {
// Create the partitioning column location key finder
- keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, fileIO, partitionSpec,
- userInstructions);
+ keyFinder = new IcebergKeyValuePartitionedLayout(tableDef, table, snapshot, table.io(), partitionSpec,
+ userInstructions, dataInstructionsProvider);
}
refreshService = null;
@@ -772,12 +830,4 @@ private Table readTableInternal(
public Catalog catalog() {
return catalog;
}
-
- /**
- * Returns the underlying Iceberg {@link FileIO} used by this adapter.
- */
- @SuppressWarnings("unused")
- public FileIO fileIO() {
- return fileIO;
- }
}
diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTools.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTools.java
index bcdda326dca..5dd20f699a9 100644
--- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTools.java
+++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTools.java
@@ -3,8 +3,14 @@
//
package io.deephaven.iceberg.util;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.io.FileIO;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Map;
/**
* Tools for accessing tables in the Iceberg table format.
@@ -12,8 +18,114 @@
public abstract class IcebergTools {
@SuppressWarnings("unused")
public static IcebergCatalogAdapter createAdapter(
- final Catalog catalog,
- final FileIO fileIO) {
- return new IcebergCatalogAdapter(catalog, fileIO);
+ final Catalog catalog) {
+ return new IcebergCatalogAdapter(catalog);
+ }
+
+ /**
+ *
+ * Create an Iceberg catalog adapter for an Iceberg catalog created from configuration properties. These properties
+ * map to the Iceberg catalog Java API properties and are used to create the catalog and file IO implementations.
+ *
+ *
+ * The minimal set of properties required to create an Iceberg catalog are:
+ *
+ * - {@code "catalog-impl"} or {@code "type"} - the Java catalog implementation to use. When providing
+ * {@code "catalog-impl"}, the implementing Java class should be provided (e.g.
+ * {@code "org.apache.iceberg.rest.RESTCatalog"} or {@code "org.apache.iceberg.aws.glue.GlueCatalog")}. Choices for
+ * {@code "type"} include {@code "hive"}, {@code "hadoop"}, {@code "rest"}, {@code "glue"}, {@code "nessie"},
+ * {@code "jdbc"}.
+ * - {@code "uri"} - the URI of the catalog.
+ *
+ *
+ * Other common properties include:
+ *
+ *
+ * - {@code "warehouse"} - the location of the data warehouse.
+ * - {@code "client.region"} - the region of the AWS client.
+ * - {@code "s3.access-key-id"} - the S3 access key for reading files.
+ * - {@code "s3.secret-access-key"} - the S3 secret access key for reading files.
+ * - {@code "s3.endpoint"} - the S3 endpoint to connect to.
+ *
+ *
+ * Additional properties for the specific catalog should also be included, such as as S3-specific properties for
+ * authentication or endpoint overriding.
+ *
+ *
+ * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
+ * @param properties a map containing the Iceberg catalog properties to use
+ * @return the Iceberg catalog adapter
+ */
+ @SuppressWarnings("unused")
+ public static IcebergCatalogAdapter createAdapter(
+ @Nullable final String name,
+ @NotNull final Map properties) {
+ return createAdapter(name, properties, Map.of());
+ }
+
+ /**
+ *
+ * Create an Iceberg catalog adapter for an Iceberg catalog created from configuration properties. These properties
+ * map to the Iceberg catalog Java API properties and are used to create the catalog and file IO implementations.
+ *
+ *
+ * The minimal set of properties required to create an Iceberg catalog are:
+ *
+ * - {@code "catalog-impl"} or {@code "type"} - the Java catalog implementation to use. When providing
+ * {@code "catalog-impl"}, the implementing Java class should be provided (e.g.
+ * {@code "org.apache.iceberg.rest.RESTCatalog"} or {@code "org.apache.iceberg.aws.glue.GlueCatalog")}. Choices for
+ * {@code "type"} include {@code "hive"}, {@code "hadoop"}, {@code "rest"}, {@code "glue"}, {@code "nessie"},
+ * {@code "jdbc"}.
+ * - {@code "uri"} - the URI of the catalog.
+ *
+ *
+ * Other common properties include:
+ *
+ *
+ * - {@code "warehouse"} - the location of the data warehouse.
+ * - {@code "client.region"} - the region of the AWS client.
+ * - {@code "s3.access-key-id"} - the S3 access key for reading files.
+ * - {@code "s3.secret-access-key"} - the S3 secret access key for reading files.
+ * - {@code "s3.endpoint"} - the S3 endpoint to connect to.
+ *
+ *
+ * Additional properties for the specific catalog should also be included, such as as S3-specific properties for
+ * authentication or endpoint overriding.
+ *
+ *
+ * @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
+ * @param properties a map containing the Iceberg catalog properties to use
+ * @param hadoopConfig a map containing Hadoop configuration properties to use
+ * @return the Iceberg catalog adapter
+ */
+ @SuppressWarnings("unused")
+ public static IcebergCatalogAdapter createAdapter(
+ @Nullable final String name,
+ @NotNull final Map properties,
+ @NotNull final Map hadoopConfig) {
+ // Validate the minimum required properties are set
+ if (!properties.containsKey(CatalogProperties.CATALOG_IMPL)
+ && !properties.containsKey(CatalogUtil.ICEBERG_CATALOG_TYPE)) {
+ throw new IllegalArgumentException(
+ String.format("Catalog type '%s' or implementation class '%s' is required",
+ CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogProperties.CATALOG_IMPL));
+ }
+ if (!properties.containsKey(CatalogProperties.URI)) {
+ throw new IllegalArgumentException(String.format("Catalog URI property '%s' is required",
+ CatalogProperties.URI));
+ }
+
+ final String catalogUri = properties.get(CatalogProperties.URI);
+ final String catalogName = name != null ? name : "IcebergCatalog-" + catalogUri;
+
+ // Load the Hadoop configuration with the provided properties
+ final Configuration hadoopConf = new Configuration();
+ hadoopConfig.forEach(hadoopConf::set);
+
+ // Create the Iceberg catalog from the properties
+ final Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, properties, hadoopConf);
+
+ return new IcebergCatalogAdapter(catalog, properties);
}
+
}
diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestCatalog.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestCatalog.java
index e62fbd282e0..3d95032e1f8 100644
--- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestCatalog.java
+++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestCatalog.java
@@ -10,7 +10,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
-import org.apache.iceberg.io.FileIO;
+import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.util.*;
@@ -19,7 +19,7 @@ public class IcebergTestCatalog implements Catalog, SupportsNamespaces {
private final Map> namespaceTableMap;
private final Map tableMap;
- private IcebergTestCatalog(final String path, final FileIO fileIO) {
+ private IcebergTestCatalog(final String path, @NotNull final Map properties) {
namespaceTableMap = new HashMap<>();
tableMap = new HashMap<>();
@@ -33,7 +33,7 @@ private IcebergTestCatalog(final String path, final FileIO fileIO) {
if (tableFile.isDirectory()) {
// Second level is table name.
final TableIdentifier tableId = TableIdentifier.of(namespace, tableFile.getName());
- final Table table = IcebergTestTable.loadFromMetadata(tableFile.getAbsolutePath(), fileIO);
+ final Table table = IcebergTestTable.loadFromMetadata(tableFile.getAbsolutePath(), properties);
// Add it to the maps.
namespaceTableMap.get(namespace).put(tableId, table);
@@ -44,8 +44,8 @@ private IcebergTestCatalog(final String path, final FileIO fileIO) {
}
}
- public static IcebergTestCatalog create(final String path, final FileIO fileIO) {
- return new IcebergTestCatalog(path, fileIO);
+ public static IcebergTestCatalog create(final String path, @NotNull final Map properties) {
+ return new IcebergTestCatalog(path, properties);
}
@Override
diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestFileIO.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestFileIO.java
deleted file mode 100644
index 03be6ca1b5e..00000000000
--- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestFileIO.java
+++ /dev/null
@@ -1,49 +0,0 @@
-//
-// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
-//
-package io.deephaven.iceberg.TestCatalog;
-
-import org.apache.iceberg.inmemory.InMemoryFileIO;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.HashSet;
-import java.util.Set;
-
-public class IcebergTestFileIO extends InMemoryFileIO {
- private final Set inputFiles;
- private final String matchPrefix;
- private final String replacePrefix;
-
- public IcebergTestFileIO(final String matchPrefix, final String replacePrefix) {
- this.matchPrefix = matchPrefix;
- this.replacePrefix = replacePrefix;
- inputFiles = new HashSet<>();
- }
-
- @Override
- public InputFile newInputFile(String s) {
- if (!inputFiles.contains(s)) {
- try {
- final String replaced = s.replace(matchPrefix, replacePrefix);
- final byte[] data = Files.readAllBytes(Path.of(replaced));
- addFile(s, data);
- inputFiles.add(s);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return super.newInputFile(s);
- }
-
- @Override
- public OutputFile newOutputFile(String s) {
- return null;
- }
-
- @Override
- public void deleteFile(String s) {}
-}
diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestTable.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestTable.java
index d1cf5c2ee0e..bcac783def0 100644
--- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestTable.java
+++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergTestTable.java
@@ -3,10 +3,12 @@
//
package io.deephaven.iceberg.TestCatalog;
+import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.ResolvingFileIO;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.shaded.org.apache.commons.lang3.NotImplementedException;
@@ -18,11 +20,14 @@
public class IcebergTestTable implements Table {
private final TableMetadata metadata;
- private final FileIO fileIO;
+ private final Map properties;
+ private final Configuration hadoopConf;
+
+ private IcebergTestTable(@NotNull final String path, @NotNull final Map properties) {
+ this.properties = properties;
+ hadoopConf = new Configuration();
- private IcebergTestTable(@NotNull final String path, @NotNull final FileIO fileIO) {
final File metadataRoot = new File(path, "metadata");
- this.fileIO = fileIO;
final List metadataFiles = new ArrayList<>();
@@ -44,8 +49,10 @@ private IcebergTestTable(@NotNull final String path, @NotNull final FileIO fileI
}
}
- public static IcebergTestTable loadFromMetadata(@NotNull final String path, @NotNull final FileIO fileIO) {
- return new IcebergTestTable(path, fileIO);
+ public static IcebergTestTable loadFromMetadata(
+ @NotNull final String path,
+ @NotNull final Map properties) {
+ return new IcebergTestTable(path, properties);
}
@Override
@@ -214,7 +221,10 @@ public Transaction newTransaction() {
@Override
public FileIO io() {
- return fileIO;
+ final ResolvingFileIO io = new ResolvingFileIO();
+ io.setConf(hadoopConf);
+ io.initialize(properties);
+ return io;
}
@Override
diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java
index befd758f980..5d5b550089d 100644
--- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java
+++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java
@@ -16,6 +16,7 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;
import java.net.URI;
+import java.util.Map;
public final class SingletonContainers {
@@ -53,6 +54,14 @@ public static S3AsyncClient s3AsyncClient() {
AwsBasicCredentials.create(LOCALSTACK_S3.getAccessKey(), LOCALSTACK_S3.getSecretKey())))
.build();
}
+
+ public static Map s3Properties() {
+ return Map.of(
+ "s3.endpoint", LOCALSTACK_S3.getEndpoint().toString(),
+ "client.region", LOCALSTACK_S3.getRegion(),
+ "s3.access-key-id", LOCALSTACK_S3.getAccessKey(),
+ "s3.secret-access-key", LOCALSTACK_S3.getSecretKey());
+ }
}
public static final class MinIO {
@@ -87,5 +96,13 @@ public static S3AsyncClient s3AsyncClient() {
AwsBasicCredentials.create(MINIO.getUserName(), MINIO.getPassword())))
.build();
}
+
+ public static Map s3Properties() {
+ return Map.of(
+ "s3.endpoint", MINIO.getS3URL(),
+ "client.region", Region.AWS_GLOBAL.toString(),
+ "s3.access-key-id", MINIO.getUserName(),
+ "s3.secret-access-key", MINIO.getPassword());
+ }
}
}
diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py
index 0a99d3f1880..16fcd08f37d 100644
--- a/py/server/deephaven/experimental/iceberg.py
+++ b/py/server/deephaven/experimental/iceberg.py
@@ -11,8 +11,11 @@
from deephaven.experimental import s3
from deephaven.table import Table, TableDefinition, TableDefinitionLike
+from deephaven.jcompat import j_hashmap
+
_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions")
_JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter")
+_JIcebergTools = jpy.get_type("io.deephaven.iceberg.util.IcebergTools")
# IcebergToolsS3 is an optional library
try:
@@ -245,3 +248,74 @@ def adapter_aws_glue(
except Exception as e:
raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e
+
+def adapter(
+ name: Optional[str] = None,
+ properties: Optional[Dict[str, str]] = None,
+ hadoopConfig: Optional[Dict[str, str]] = None
+) -> IcebergCatalogAdapter:
+ """
+ Create an Iceberg catalog adapter from configuration properties. These properties map to the Iceberg catalog Java
+ API properties and are used to select the catalog and file IO implementations.
+
+ The minimal set of properties required to create an Iceberg catalog are the following:
+ - `catalog-impl` or `type` - the Java catalog implementation to use. When providing `catalog-impl`, the
+ implementing Java class should be provided (e.g. `org.apache.iceberg.rest.RESTCatalog` or
+ `org.apache.iceberg.aws.glue.GlueCatalog`). Choices for `type` include `hive`, `hadoop`, `rest`, `glue`,
+ `nessie`, `jdbc`.
+ - `uri` - the URI of the catalog
+
+ Other common properties include:
+ - `warehouse` - the root path of the data warehouse.
+ - `client.region` - the region of the AWS client.
+ - `s3.access-key-id` - the S3 access key for reading files.
+ - `s3.secret-access-key` - the S3 secret access key for reading files.
+ - `s3.endpoint` - the S3 endpoint to connect to.
+
+ Example usage #1 - REST catalog with an S3 backend (using MinIO):
+ ```
+ from deephaven.experimental import iceberg
+
+ adapter = iceberg.adapter_generic(name="generic-adapter", properties={
+ "type" : "rest",
+ "uri" : "http://rest:8181",
+ "client.region" : "us-east-1",
+ "s3.access-key-id" : "admin",
+ "s3.secret-access-key" : "password",
+ "s3.endpoint" : "http://minio:9000"
+ })
+ ```
+
+ Example usage #2 - AWS Glue catalog:
+ ```
+ from deephaven.experimental import iceberg
+
+ ## Note: region and credential information are loaded by the catalog from the environment
+ adapter = iceberg.adapter_generic(name="generic-adapter", properties={
+ "type" : "glue",
+ "uri" : "s3://lab-warehouse/sales",
+ });
+ ```
+
+ Args:
+ name (Optional[str]): a descriptive name of the catalog; if omitted the catalog name is inferred from the
+ catalog URI property.
+ properties (Optional[Dict[str, str]]): the properties of the catalog to load
+ hadoopConfig (Optional[Dict[str, str]]): hadoop configuration properties for the catalog to load
+
+ Returns:
+ IcebergCatalogAdapter: the catalog adapter created from the provided properties
+
+ Raises:
+ DHError: If unable to build the catalog adapter
+ """
+
+ try:
+ return IcebergCatalogAdapter(
+ _JIcebergTools.createAdapter(
+ name,
+ j_hashmap(properties if properties is not None else {}),
+ j_hashmap(hadoopConfig if hadoopConfig is not None else {})))
+ except Exception as e:
+ raise DHError(e, "Failed to build Iceberg Catalog Adapter") from e
+