Skip to content

Commit

Permalink
feat: expose Iceberg features to python users (deephaven#5590)
Browse files Browse the repository at this point in the history
* WIP commit, functional but needs docs.

* Added AWS Glue functionality.

* Update Iceberg and AWS versions, itemize aws dependencies.

* Documentation improvements.
  • Loading branch information
lbooker42 committed Jun 24, 2024
1 parent 7ec5131 commit a13a0dc
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 64 deletions.
4 changes: 2 additions & 2 deletions buildSrc/src/main/groovy/Classpaths.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ class Classpaths {
static final String HADOOP_VERSION = '3.4.0'

static final String ICEBERG_GROUP = 'org.apache.iceberg'
static final String ICEBERG_VERSION = '1.5.0'
static final String ICEBERG_VERSION = '1.5.2'

static final String AWSSDK_GROUP = 'software.amazon.awssdk'
static final String AWSSDK_VERSION = '2.23.19'
static final String AWSSDK_VERSION = '2.24.5'

static final String TESTCONTAINER_GROUP = 'org.testcontainers'
static final String TESTCONTAINER_VERSION = '1.19.4'
Expand Down
4 changes: 3 additions & 1 deletion extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ dependencies {

implementation project(':extensions-s3')
implementation "org.apache.iceberg:iceberg-aws"
runtimeOnly "org.apache.iceberg:iceberg-aws-bundle"

Classpaths.inheritAWSSDK(project)
runtimeOnly "software.amazon.awssdk:sts"
runtimeOnly "software.amazon.awssdk:glue"

Classpaths.inheritTestContainers(project)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
Expand All @@ -23,6 +24,20 @@
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
* value, the system defaults will be used.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the Iceberg REST catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @param region the AWS region; if omitted, system defaults will be used
* @param accessKeyId the AWS access key ID; if omitted, system defaults will be used
* @param secretAccessKey the AWS secret access key; if omitted, system defaults will be used
* @param endpointOverride the S3 endpoint override; this is useful for testing with a S3-compatible local service
* such as MinIO or LocalStack
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createS3Rest(
@Nullable final String name,
@NotNull final String catalogURI,
Expand Down Expand Up @@ -53,7 +68,6 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

// TODO: create a FileIO interface wrapping the Deephaven S3SeekableByteChannel/Provider
final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
Expand All @@ -62,4 +76,36 @@ public static IcebergCatalogAdapter createS3Rest(
return new IcebergCatalogAdapter(catalog, fileIO);
}

/**
* Create an Iceberg catalog adapter for an AWS Glue catalog. System defaults will be used to populate the region
* and credentials. These can be configured by following
* <a href="https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-authentication.html">AWS Authentication and
* access credentials</a> guide.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param catalogURI the URI of the AWS Glue catalog
* @param warehouseLocation the location of the S3 datafiles backing the catalog
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createGlue(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation) {

// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final GlueCatalog catalog = new GlueCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

final FileIO fileIO = CatalogUtil.loadFileIO(S3_FILE_IO_CLASS, properties, null);

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, fileIO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,23 @@ public void testListTables() {

final Namespace ns = Namespace.of("sales");

final Collection<TableIdentifier> tables = adapter.listTables(ns);
Collection<TableIdentifier> tables = adapter.listTables(ns);
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");

final Table table = adapter.listTablesAsTable(ns);
Table table = adapter.listTablesAsTable(ns);
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eqTrue(table.getColumnSource("namespace").getType().equals(String.class), "namespace column type");
Assert.eqTrue(table.getColumnSource("table_name").getType().equals(String.class), "table_name column type");
Assert.eqTrue(table.getColumnSource("table_identifier_object").getType().equals(TableIdentifier.class),
"table_identifier_object column type");

// Test the string versions of the methods
table = adapter.listTablesAsTable("sales");
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
}

@Test
Expand All @@ -160,14 +164,18 @@ public void testListSnapshots() {
Assert.eqTrue(snapshotIds.contains(3247344357341484163L), "snapshots.contains(3247344357341484163L)");
Assert.eqTrue(snapshotIds.contains(1792185872197984875L), "snapshots.contains(1792185872197984875L)");

final Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Table table = adapter.listSnapshotsAsTable(tableIdentifier);
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
Assert.eqTrue(table.getColumnSource("id").getType().equals(long.class), "id column type");
Assert.eqTrue(table.getColumnSource("timestamp_ms").getType().equals(long.class), "timestamp_ms column type");
Assert.eqTrue(table.getColumnSource("operation").getType().equals(String.class), "operation column type");
Assert.eqTrue(table.getColumnSource("summary").getType().equals(Map.class), "summary column type");
Assert.eqTrue(table.getColumnSource("snapshot_object").getType().equals(Snapshot.class),
"snapshot_object column type");

// Test the string versions of the methods
table = adapter.listSnapshotsAsTable("sales.sales_multi");
Assert.eq(table.size(), "table.size()", 4, "4 snapshots for sales/sales_multi");
}

@Test
Expand All @@ -180,7 +188,13 @@ public void testOpenTableA() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_partitioned", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand All @@ -196,9 +210,15 @@ public void testOpenTableB() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_multi");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_multi", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
}

@Test
Expand All @@ -211,7 +231,13 @@ public void testOpenTableC() throws ExecutionException, InterruptedException, Ti

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_single");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods
table = adapter.readTable("sales.sales_single", instructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
Expand Down Expand Up @@ -563,16 +589,31 @@ public void testOpenTableSnapshotByID() throws ExecutionException, InterruptedEx
final List<Snapshot> snapshots = adapter.listSnapshots(tableId);

// Verify we retrieved all the rows.
final io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
io.deephaven.engine.table.Table table0 = adapter.readTable(tableId, snapshots.get(0), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");

// Test the string versions of the methods

// Verify we retrieved all the rows.
table0 = adapter.readTable("sales.sales_multi", snapshots.get(0).snapshotId(), instructions);
Assert.eq(table0.size(), "table0.size()", 18266, "18266 rows in the table");

final io.deephaven.engine.table.Table table1 = adapter.readTable(tableId, snapshots.get(1), instructions);
table1 = adapter.readTable(tableId, snapshots.get(1).snapshotId(), instructions);
Assert.eq(table1.size(), "table1.size()", 54373, "54373 rows in the table");

final io.deephaven.engine.table.Table table2 = adapter.readTable(tableId, snapshots.get(2), instructions);
table2 = adapter.readTable(tableId, snapshots.get(2).snapshotId(), instructions);
Assert.eq(table2.size(), "table2.size()", 72603, "72603 rows in the table");

final io.deephaven.engine.table.Table table3 = adapter.readTable(tableId, snapshots.get(3), instructions);
table3 = adapter.readTable(tableId, snapshots.get(3).snapshotId(), instructions);
Assert.eq(table3.size(), "table3.size()", 100_000, "100_000 rows in the table");
}

Expand Down
Loading

0 comments on commit a13a0dc

Please sign in to comment.