Skip to content

Commit

Permalink
[Iceberg]Support s3 path as warehouse dir for hadoop catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Dec 19, 2024
1 parent b94b05a commit 7d5ef60
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 34 deletions.
16 changes: 16 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,22 @@ Property Name Description
Otherwise, it will be ignored.
======================================================= ============================================================= ============

By configuring the `Amazon S3 <https://prestodb.io/docs/current/connector/hive.html##amazon-s3-configuration>`_
properties, we can specify a S3 location as the warehouse dir of Hadoop catalog. This way, both metadata and data
of iceberg tables will be maintained on S3 storage. An example configuration includes:

.. code-block:: none
connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=s3://iceberg_bucket/warehouse
hive.s3.use-instance-credentials=false
hive.s3.aws-access-key=accesskey
hive.s3.aws-secret-key=secretkey
hive.s3.endpoint=http://192.168.0.103:9878
hive.s3.path-style-access=true
Configuration Properties
------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,7 @@ private static boolean isDirectory(PrestoS3ObjectMetadata metadata)
}

return mediaType.is(X_DIRECTORY_MEDIA_TYPE) ||
(mediaType.is(OCTET_STREAM_MEDIA_TYPE)
&& metadata.isKeyNeedsPathSeparator()
(metadata.isKeyNeedsPathSeparator()
&& objectMetadata.getContentLength() == 0);
}

Expand Down Expand Up @@ -546,8 +545,13 @@ private boolean deleteObject(String key)
@Override
public boolean mkdirs(Path f, FsPermission permission)
{
// no need to do anything for S3
return true;
try {
s3.putObject(getBucketName(uri), keyFromPath(f) + PATH_SEPARATOR, "");
return true;
}
catch (AmazonClientException e) {
return false;
}
}

private enum ListingMode {
Expand Down
22 changes: 22 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,28 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater;
import com.facebook.presto.hive.s3.S3ConfigurationUpdater;
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.metadata.CatalogMetadata;
import com.facebook.presto.metadata.Metadata;
Expand Down Expand Up @@ -57,6 +60,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -90,7 +94,6 @@
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -1604,14 +1607,14 @@ public void testMetadataVersionsMaintainingProperties()
// Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files
assertEquals(defaultTableMetadata.previousFiles().size(), 5);

FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new org.apache.hadoop.fs.Path(settingTable.location()));
FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new Path(settingTable.location()));

// Table `test_table_with_setting_properties`'s all existing metadata files count is 2
FileStatus[] settingTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
FileStatus[] settingTableFiles = fileSystem.listStatus(new Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(settingTableFiles.length, 2);

// Table `test_table_with_default_setting_properties`'s all existing metadata files count is 6
FileStatus[] defaultTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
FileStatus[] defaultTableFiles = fileSystem.listStatus(new Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(defaultTableFiles.length, 6);
}
finally {
Expand Down Expand Up @@ -2018,12 +2021,12 @@ private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<Fil
private void writePositionDeleteToNationTable(Table icebergTable, String dataFilePath, long deletePos)
throws IOException
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI());
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + randomUUID();
FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir);
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(metadataDir, deleteFileName);
Path path = new Path(metadataDir, deleteFileName);
PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs))
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
Expand All @@ -2050,13 +2053,13 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Ob
private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Object> overwriteValues, Map<String, Object> partitionValues)
throws Exception
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI());
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + randomUUID();
FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir);
Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet());
Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(metadataDir, deleteFileName), fs))
Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs))
.forTable(icebergTable)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
Expand All @@ -2077,13 +2080,19 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Ob
icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
}

public static HdfsEnvironment getHdfsEnvironment()
protected HdfsEnvironment getHdfsEnvironment()
{
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig),
ImmutableSet.of(),
hiveClientConfig);
HiveS3Config hiveS3Config = new HiveS3Config();
return getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config);
}

public static HdfsEnvironment getHdfsEnvironment(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig, HiveS3Config hiveS3Config)
{
S3ConfigurationUpdater s3ConfigurationUpdater = new PrestoS3ConfigurationUpdater(hiveS3Config);
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig, s3ConfigurationUpdater, ignored -> {}),
ImmutableSet.of(), hiveClientConfig);
return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
}

Expand All @@ -2105,18 +2114,18 @@ protected Table loadTable(String tableName)

protected Map<String, String> getProperties()
{
File metastoreDir = getCatalogDirectory();
Path metastoreDir = getCatalogDirectory();
return ImmutableMap.of("warehouse", metastoreDir.toString());
}

protected File getCatalogDirectory()
protected Path getCatalogDirectory()
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
switch (catalogType) {
case HIVE:
case HADOOP:
case NESSIE:
return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI());
}

throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -202,13 +203,11 @@ public static DistributedQueryRunner createIcebergQueryRunner(
String catalogType = extraConnectorProperties.getOrDefault("iceberg.catalog.type", HIVE.name());
Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType, format, addStorageFormatToPath);

Map<String, String> icebergProperties = ImmutableMap.<String, String>builder()
.put("iceberg.file-format", format.name())
.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory))
.putAll(extraConnectorProperties)
.build();

queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);
Map<String, String> icebergProperties = new HashMap<>();
icebergProperties.put("iceberg.file-format", format.name());
icebergProperties.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory));
icebergProperties.putAll(extraConnectorProperties);
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.copyOf(icebergProperties));

if (addJmxPlugin) {
queryRunner.installPlugin(new JmxPlugin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.hive.HiveDwrfEncryptionProvider;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.parquet.FileParquetDataSource;
import com.facebook.presto.parquet.cache.MetadataReader;
Expand Down Expand Up @@ -61,6 +64,7 @@
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema;
import static com.facebook.presto.iceberg.IcebergDistributedTestBase.getHdfsEnvironment;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergSessionProperties.dataSizeSessionProperty;
import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager;
Expand Down Expand Up @@ -113,7 +117,7 @@ public void setup() throws Exception
this.connectorSession = session.toConnectorSession(connectorId);
TypeManager typeManager = new TestingTypeManager();
this.hdfsContext = new HdfsContext(connectorSession);
HdfsEnvironment hdfsEnvironment = IcebergDistributedTestBase.getHdfsEnvironment();
HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config());
this.icebergFileWriterFactory = new IcebergFileWriterFactory(hdfsEnvironment, typeManager,
new FileFormatDataSourceStats(), new NodeVersion("test"), new OrcFileWriterConfig(), HiveDwrfEncryptionProvider.NO_ENCRYPTION);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.container;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.facebook.presto.testing.containers.MinIOContainer;
import com.facebook.presto.util.AutoCloseableCloser;
import com.google.common.collect.ImmutableMap;
import org.testcontainers.containers.Network;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Objects.requireNonNull;
import static org.testcontainers.containers.Network.newNetwork;

public class IcebergMinIODataLake
implements Closeable
{
public static final String ACCESS_KEY = "accesskey";
public static final String SECRET_KEY = "secretkey";

private final String bucketName;
private final String warehouseDir;
private final MinIOContainer minIOContainer;

private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AutoCloseableCloser closer = AutoCloseableCloser.create();

public IcebergMinIODataLake(String bucketName, String warehouseDir)
{
this.bucketName = requireNonNull(bucketName, "bucketName is null");
this.warehouseDir = requireNonNull(warehouseDir, "warehouseDir is null");
Network network = closer.register(newNetwork());
this.minIOContainer = closer.register(
MinIOContainer.builder()
.withNetwork(network)
.withEnvVars(ImmutableMap.<String, String>builder()
.put("MINIO_ACCESS_KEY", ACCESS_KEY)
.put("MINIO_SECRET_KEY", SECRET_KEY)
.build())
.build());
}

public void start()
{
if (isStarted()) {
return;
}
try {
this.minIOContainer.start();
AmazonS3 s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
"http://localhost:" + minIOContainer.getMinioApiEndpoint().getPort(),
"us-east-1"))
.withPathStyleAccessEnabled(true)
.withCredentials(new AWSStaticCredentialsProvider(
new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY)))
.build();
s3Client.createBucket(this.bucketName);
s3Client.putObject(this.bucketName, this.warehouseDir, "");
}
finally {
isStarted.set(true);
}
}

public boolean isStarted()
{
return isStarted.get();
}

public void stop()
{
if (!isStarted()) {
return;
}
try {
closer.close();
}
catch (Exception e) {
throw new RuntimeException("Failed to stop IcebergMinioDataLake", e);
}
finally {
isStarted.set(false);
}
}

public MinIOContainer getMinio()
{
return minIOContainer;
}

@Override
public void close()
throws IOException
{
stop();
}
}
Loading

0 comments on commit 7d5ef60

Please sign in to comment.