Skip to content

Commit

Permalink
Add Support for Iceberg table sort orders
Browse files Browse the repository at this point in the history
Add Support for Iceberg table sort orders

Add Support for Iceberg table sort orders
  • Loading branch information
evanvdia committed Jul 12, 2024
1 parent 66a7fc9 commit 5e60f75
Show file tree
Hide file tree
Showing 32 changed files with 1,139 additions and 71 deletions.
44 changes: 43 additions & 1 deletion presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Property Name Description

Example: ``hdfs://nn:8020/warehouse/path``
This property is required if the ``iceberg.catalog.type`` is
``hadoop``.
``hadoop``. Otherwise, it will be ignored.

``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
required if the ``iceberg.catalog.type`` is ``hadoop``.
Expand Down Expand Up @@ -1531,3 +1531,45 @@ Map of PrestoDB types to the relevant Iceberg types:
- ``TIMESTAMP WITH ZONE``

No other types are supported.


Sorted Tables
^^^^^^^^^^^^^

The Iceberg connector supports the creation of sorted tables.
Data in the Iceberg table is sorted during the writing process within each file.

Sorted Iceberg tables can provide a huge increase in performance in query times.
Sorting is particularly beneficial when the sorted columns show a
high cardinality and are used as a filter for selective reads.

Configure sort order with the ``sorted_by`` table property to specify an array of
one or more columns to use for sorting.
The following example creates the table with the ``sorted_by`` property, and sorts the file based
on the field ``join_date``.

.. code-block:: text

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
sorted_by = ARRAY['join_date']
)

Sorting can be combined with partitioning on the same column. For example::

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
partitioning = ARRAY['month(join_date)'],
sorted_by = ARRAY['join_date']
)

To disable the sorted writing, set the session property
``sorted_writing_enabled`` to ``false``.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.airlift.configuration.LegacyConfig;
import com.facebook.drift.transport.netty.codec.Protocol;
import com.facebook.presto.hive.s3.S3FileSystemType;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
Expand All @@ -30,7 +31,6 @@

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -45,6 +45,7 @@
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior.OVERWRITE;
import static com.facebook.presto.hive.HiveSessionProperties.INSERT_EXISTING_PARTITIONS_BEHAVIOR;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class HiveClientConfig
private int splitLoaderConcurrency = 4;
private DataSize maxInitialSplitSize;
private int domainCompactionThreshold = 100;
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
private NodeSelectionStrategy nodeSelectionStrategy = NO_PREFERENCE;
private boolean recursiveDirWalkerEnabled;

private int maxConcurrentFileRenames = 20;
Expand All @@ -101,7 +102,6 @@ public class HiveClientConfig
private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true;
private InsertExistingPartitionsBehavior insertExistingPartitionsBehavior;
private int maxPartitionsPerWriter = 100;
private int maxOpenSortFiles = 50;
private int writeValidationThreads = 16;

private List<String> resourceConfigFiles = ImmutableList.of();
Expand Down Expand Up @@ -278,17 +278,15 @@ public HiveClientConfig setDomainCompactionThreshold(int domainCompactionThresho
return this;
}

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
public NodeSelectionStrategy getNodeSelectionStrategy()
{
return writerSortBufferSize;
return nodeSelectionStrategy;
}

@Config("hive.writer-sort-buffer-size")
public HiveClientConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
@Config("hive.node-selection-strategy")
public HiveClientConfig setNodeSelectionStrategy(NodeSelectionStrategy nodeSelectionStrategy)
{
this.writerSortBufferSize = writerSortBufferSize;
this.nodeSelectionStrategy = nodeSelectionStrategy;
return this;
}

Expand Down Expand Up @@ -694,22 +692,6 @@ public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("hive.max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}

public int getWriteValidationThreads()
{
return writeValidationThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void configure(Binder binder)
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, DynamicConfigurationProvider.class);
configBinder(binder).bindConfig(HiveClientConfig.class);

configBinder(binder).bindConfig(SortingFileWriterConfig.class, "hive");
binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public HivePageSinkProvider(
TypeManager typeManager,
HiveClientConfig hiveClientConfig,
MetastoreClientConfig metastoreClientConfig,
SortingFileWriterConfig sortingFileWriterConfig,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
Expand All @@ -110,8 +111,8 @@ public HivePageSinkProvider(
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.maxOpenPartitions = hiveClientConfig.getMaxPartitionsPerWriter();
this.maxOpenSortFiles = hiveClientConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(hiveClientConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.immutablePartitions = hiveClientConfig.isImmutablePartitions();
this.locationService = requireNonNull(locationService, "locationService is null");
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class SortingFileWriterConfig
{
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
private int maxOpenSortFiles = 50;

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
{
return writerSortBufferSize;
}

@Config("writer-sort-buffer-size")
public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
{
this.writerSortBufferSize = writerSortBufferSize;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
FUNCTION_AND_TYPE_MANAGER,
getHiveClientConfig(),
getMetastoreClientConfig(),
getSortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand All @@ -1099,8 +1100,6 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
protected HiveClientConfig getHiveClientConfig()
{
return new HiveClientConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE))
.setTemporaryTableSchema(database)
.setCreateEmptyBucketFilesForTemporaryTable(false);
}
Expand All @@ -1110,6 +1109,12 @@ protected HiveCommonClientConfig getHiveCommonClientConfig()
return new HiveCommonClientConfig();
}

protected SortingFileWriterConfig getSortingFileWriterConfig()
{
return new SortingFileWriterConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE));
}
protected CacheConfig getCacheConfig()
{
return new CacheConfig().setCacheQuotaScope(CACHE_SCOPE).setDefaultCacheQuota(DEFAULT_QUOTA_SIZE);
Expand Down Expand Up @@ -3109,7 +3114,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath
true);
assertThat(listAllDataFiles(context, path))
.filteredOn(file -> file.contains(".tmp-sort"))
.size().isGreaterThan(bucketCount * getHiveClientConfig().getMaxOpenSortFiles() * 2);
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);

// finish the write
Collection<Slice> fragments = getFutureValue(sink.finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
FUNCTION_AND_TYPE_MANAGER,
config,
metastoreClientConfig,
new SortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.drift.transport.netty.codec.Protocol;
import com.facebook.presto.hive.HiveClientConfig.HdfsAuthenticationType;
import com.facebook.presto.hive.s3.S3FileSystemType;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
Expand All @@ -37,6 +38,7 @@
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.TestHiveUtil.nonDefaultTimeZone;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

Expand All @@ -59,7 +61,7 @@ public void testDefaults()
.setMaxInitialSplitSize(new DataSize(32, Unit.MEGABYTE))
.setSplitLoaderConcurrency(4)
.setDomainCompactionThreshold(100)
.setWriterSortBufferSize(new DataSize(64, Unit.MEGABYTE))
.setNodeSelectionStrategy(NodeSelectionStrategy.valueOf("NO_PREFERENCE"))
.setMaxConcurrentFileRenames(20)
.setMaxConcurrentZeroRowFileCreations(20)
.setRecursiveDirWalkerEnabled(false)
Expand All @@ -81,7 +83,6 @@ public void testDefaults()
.setFailFastOnInsertIntoImmutablePartitionsEnabled(true)
.setSortedWritingEnabled(true)
.setMaxPartitionsPerWriter(100)
.setMaxOpenSortFiles(50)
.setWriteValidationThreads(16)
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
.setUseOrcColumnNames(false)
Expand Down Expand Up @@ -194,7 +195,6 @@ public void testExplicitPropertyMappings()
.put("hive.max-initial-split-size", "16MB")
.put("hive.split-loader-concurrency", "1")
.put("hive.domain-compaction-threshold", "42")
.put("hive.writer-sort-buffer-size", "13MB")
.put("hive.recursive-directories", "true")
.put("hive.storage-format", "SEQUENCEFILE")
.put("hive.compression-codec", "NONE")
Expand All @@ -206,8 +206,8 @@ public void testExplicitPropertyMappings()
.put("hive.insert-overwrite-immutable-partitions-enabled", "true")
.put("hive.fail-fast-on-insert-into-immutable-partitions-enabled", "false")
.put("hive.max-partitions-per-writers", "222")
.put("hive.max-open-sort-files", "333")
.put("hive.write-validation-threads", "11")
.put("hive.node-selection-strategy", "HARD_AFFINITY")
.put("hive.max-concurrent-file-renames", "100")
.put("hive.max-concurrent-zero-row-file-creations", "100")
.put("hive.assume-canonical-partition-keys", "true")
Expand Down Expand Up @@ -312,7 +312,7 @@ public void testExplicitPropertyMappings()
.setMaxInitialSplitSize(new DataSize(16, Unit.MEGABYTE))
.setSplitLoaderConcurrency(1)
.setDomainCompactionThreshold(42)
.setWriterSortBufferSize(new DataSize(13, Unit.MEGABYTE))
.setNodeSelectionStrategy(HARD_AFFINITY)
.setMaxConcurrentFileRenames(100)
.setMaxConcurrentZeroRowFileCreations(100)
.setRecursiveDirWalkerEnabled(true)
Expand All @@ -330,7 +330,6 @@ public void testExplicitPropertyMappings()
.setInsertOverwriteImmutablePartitionEnabled(true)
.setFailFastOnInsertIntoImmutablePartitionsEnabled(false)
.setMaxPartitionsPerWriter(222)
.setMaxOpenSortFiles(333)
.setWriteValidationThreads(11)
.setDomainSocketPath("/foo")
.setS3FileSystemType(S3FileSystemType.EMRFS)
Expand Down
Loading

0 comments on commit 5e60f75

Please sign in to comment.