Skip to content

Commit

Permalink
Add Support for Iceberg table sort orders
Browse files Browse the repository at this point in the history
  • Loading branch information
evanvdia committed Jul 19, 2024
1 parent 08eb057 commit 6b7e988
Show file tree
Hide file tree
Showing 33 changed files with 1,142 additions and 68 deletions.
44 changes: 43 additions & 1 deletion presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ Property Name Description

Example: ``hdfs://nn:8020/warehouse/path``
This property is required if the ``iceberg.catalog.type`` is
``hadoop``.
``hadoop``. Otherwise, it will be ignored.

``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
required if the ``iceberg.catalog.type`` is ``hadoop``.
Expand Down Expand Up @@ -1559,3 +1559,45 @@ Map of PrestoDB types to the relevant Iceberg types:
- ``TIMESTAMP WITH ZONE``

No other types are supported.


Sorted Tables
^^^^^^^^^^^^^

The Iceberg connector supports the creation of sorted tables.
Data in the Iceberg table is sorted during the writing process within each file.

Sorted Iceberg tables can provide a huge increase in performance in query times.
Sorting is particularly beneficial when the sorted columns show a
high cardinality and are used as a filter for selective reads.

Configure sort order with the ``sorted_by`` table property to specify an array of
one or more columns to use for sorting.
The following example creates the table with the ``sorted_by`` property, and sorts the file based
on the field ``join_date``.

.. code-block:: text

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
sorted_by = ARRAY['join_date']
)

Sorting can be combined with partitioning on the same column. For example::

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
partitioning = ARRAY['month(join_date)'],
sorted_by = ARRAY['join_date']
)

To disable the sorted writing, set the session property
``sorted_writing_enabled`` to ``false``.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.airlift.configuration.LegacyConfig;
import com.facebook.drift.transport.netty.codec.Protocol;
import com.facebook.presto.hive.s3.S3FileSystemType;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
Expand All @@ -30,7 +31,6 @@

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -45,6 +45,7 @@
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior.OVERWRITE;
import static com.facebook.presto.hive.HiveSessionProperties.INSERT_EXISTING_PARTITIONS_BEHAVIOR;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class HiveClientConfig
private int splitLoaderConcurrency = 4;
private DataSize maxInitialSplitSize;
private int domainCompactionThreshold = 100;
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
private NodeSelectionStrategy nodeSelectionStrategy = NO_PREFERENCE;
private boolean recursiveDirWalkerEnabled;

private int maxConcurrentFileRenames = 20;
Expand All @@ -102,7 +103,6 @@ public class HiveClientConfig
private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true;
private InsertExistingPartitionsBehavior insertExistingPartitionsBehavior;
private int maxPartitionsPerWriter = 100;
private int maxOpenSortFiles = 50;
private int writeValidationThreads = 16;

private List<String> resourceConfigFiles = ImmutableList.of();
Expand Down Expand Up @@ -279,17 +279,15 @@ public HiveClientConfig setDomainCompactionThreshold(int domainCompactionThresho
return this;
}

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
public NodeSelectionStrategy getNodeSelectionStrategy()
{
return writerSortBufferSize;
return nodeSelectionStrategy;
}

@Config("hive.writer-sort-buffer-size")
public HiveClientConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
@Config("hive.node-selection-strategy")
public HiveClientConfig setNodeSelectionStrategy(NodeSelectionStrategy nodeSelectionStrategy)
{
this.writerSortBufferSize = writerSortBufferSize;
this.nodeSelectionStrategy = nodeSelectionStrategy;
return this;
}

Expand Down Expand Up @@ -695,22 +693,6 @@ public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("hive.max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}

public int getWriteValidationThreads()
{
return writeValidationThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void configure(Binder binder)
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, DynamicConfigurationProvider.class);
configBinder(binder).bindConfig(HiveClientConfig.class);

configBinder(binder).bindConfig(SortingFileWriterConfig.class, "hive");
binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public HivePageSinkProvider(
TypeManager typeManager,
HiveClientConfig hiveClientConfig,
MetastoreClientConfig metastoreClientConfig,
SortingFileWriterConfig sortingFileWriterConfig,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
Expand All @@ -110,8 +111,8 @@ public HivePageSinkProvider(
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.maxOpenPartitions = hiveClientConfig.getMaxPartitionsPerWriter();
this.maxOpenSortFiles = hiveClientConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(hiveClientConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.immutablePartitions = hiveClientConfig.isImmutablePartitions();
this.locationService = requireNonNull(locationService, "locationService is null");
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class SortingFileWriterConfig
{
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
private int maxOpenSortFiles = 50;

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
{
return writerSortBufferSize;
}

@Config("writer-sort-buffer-size")
public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
{
this.writerSortBufferSize = writerSortBufferSize;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
FUNCTION_AND_TYPE_MANAGER,
getHiveClientConfig(),
getMetastoreClientConfig(),
getSortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand All @@ -1099,8 +1100,6 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
protected HiveClientConfig getHiveClientConfig()
{
return new HiveClientConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE))
.setTemporaryTableSchema(database)
.setCreateEmptyBucketFilesForTemporaryTable(false);
}
Expand All @@ -1110,6 +1109,12 @@ protected HiveCommonClientConfig getHiveCommonClientConfig()
return new HiveCommonClientConfig();
}

protected SortingFileWriterConfig getSortingFileWriterConfig()
{
return new SortingFileWriterConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE));
}
protected CacheConfig getCacheConfig()
{
return new CacheConfig().setCacheQuotaScope(CACHE_SCOPE).setDefaultCacheQuota(DEFAULT_QUOTA_SIZE);
Expand Down Expand Up @@ -3109,7 +3114,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath
true);
assertThat(listAllDataFiles(context, path))
.filteredOn(file -> file.contains(".tmp-sort"))
.size().isGreaterThan(bucketCount * getHiveClientConfig().getMaxOpenSortFiles() * 2);
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);

// finish the write
Collection<Slice> fragments = getFutureValue(sink.finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
FUNCTION_AND_TYPE_MANAGER,
config,
metastoreClientConfig,
new SortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.drift.transport.netty.codec.Protocol;
import com.facebook.presto.hive.HiveClientConfig.HdfsAuthenticationType;
import com.facebook.presto.hive.s3.S3FileSystemType;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
Expand All @@ -37,6 +38,7 @@
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.TestHiveUtil.nonDefaultTimeZone;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand All @@ -60,7 +62,7 @@ public void testDefaults()
.setMaxInitialSplitSize(new DataSize(32, Unit.MEGABYTE))
.setSplitLoaderConcurrency(4)
.setDomainCompactionThreshold(100)
.setWriterSortBufferSize(new DataSize(64, Unit.MEGABYTE))
.setNodeSelectionStrategy(NodeSelectionStrategy.valueOf("NO_PREFERENCE"))
.setMaxConcurrentFileRenames(20)
.setMaxConcurrentZeroRowFileCreations(20)
.setRecursiveDirWalkerEnabled(false)
Expand All @@ -82,7 +84,6 @@ public void testDefaults()
.setFailFastOnInsertIntoImmutablePartitionsEnabled(true)
.setSortedWritingEnabled(true)
.setMaxPartitionsPerWriter(100)
.setMaxOpenSortFiles(50)
.setWriteValidationThreads(16)
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
.setUseOrcColumnNames(false)
Expand Down Expand Up @@ -195,7 +196,6 @@ public void testExplicitPropertyMappings()
.put("hive.max-initial-split-size", "16MB")
.put("hive.split-loader-concurrency", "1")
.put("hive.domain-compaction-threshold", "42")
.put("hive.writer-sort-buffer-size", "13MB")
.put("hive.recursive-directories", "true")
.put("hive.storage-format", "SEQUENCEFILE")
.put("hive.compression-codec", "NONE")
Expand All @@ -207,8 +207,8 @@ public void testExplicitPropertyMappings()
.put("hive.insert-overwrite-immutable-partitions-enabled", "true")
.put("hive.fail-fast-on-insert-into-immutable-partitions-enabled", "false")
.put("hive.max-partitions-per-writers", "222")
.put("hive.max-open-sort-files", "333")
.put("hive.write-validation-threads", "11")
.put("hive.node-selection-strategy", "HARD_AFFINITY")
.put("hive.max-concurrent-file-renames", "100")
.put("hive.max-concurrent-zero-row-file-creations", "100")
.put("hive.assume-canonical-partition-keys", "true")
Expand Down Expand Up @@ -313,7 +313,7 @@ public void testExplicitPropertyMappings()
.setMaxInitialSplitSize(new DataSize(16, Unit.MEGABYTE))
.setSplitLoaderConcurrency(1)
.setDomainCompactionThreshold(42)
.setWriterSortBufferSize(new DataSize(13, Unit.MEGABYTE))
.setNodeSelectionStrategy(HARD_AFFINITY)
.setMaxConcurrentFileRenames(100)
.setMaxConcurrentZeroRowFileCreations(100)
.setRecursiveDirWalkerEnabled(true)
Expand All @@ -331,7 +331,6 @@ public void testExplicitPropertyMappings()
.setInsertOverwriteImmutablePartitionEnabled(true)
.setFailFastOnInsertIntoImmutablePartitionsEnabled(false)
.setMaxPartitionsPerWriter(222)
.setMaxOpenSortFiles(333)
.setWriteValidationThreads(11)
.setDomainSocketPath("/foo")
.setS3FileSystemType(S3FileSystemType.EMRFS)
Expand Down
Loading

0 comments on commit 6b7e988

Please sign in to comment.