Skip to content

Commit

Permalink
Add Support for Iceberg table sort orders
Browse files Browse the repository at this point in the history
Add Support for Iceberg table sort orders

Add Support for Iceberg table sort orders
  • Loading branch information
evanvdia committed Mar 12, 2024
1 parent c1f0066 commit 30ac341
Show file tree
Hide file tree
Showing 33 changed files with 1,147 additions and 93 deletions.
60 changes: 47 additions & 13 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,14 @@ Metastore cache only caches schema and table names. Other metadata would be fetc
hive.metastore-cache-maximum-size=10000000

Extra Hidden Metadata Columns
-----------------------------
----------------------------

The Iceberg connector exposes extra hidden metadata columns. You can query these
as part of a SQL query by including them in your SELECT statement.

``$path`` column
^^^^^^^^^^^^^^^^
* ``$path``: Full file system path name of the file for this row

.. code-block:: sql

SELECT "$path", regionkey FROM "ctas_nation";
Expand All @@ -435,7 +434,6 @@ as part of a SQL query by including them in your SELECT statement.
``$data_sequence_number`` column
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* ``$data_sequence_number``: The Iceberg data sequence number in which this row was added

.. code-block:: sql

SELECT "$data_sequence_number", regionkey FROM "ctas_nation";
Expand All @@ -455,7 +453,6 @@ as a part of a SQL query by appending name to the table.
``$properties`` Table
^^^^^^^^^^^^^^^^^^^^^
* ``$properties`` : General properties of the given table

.. code-block:: sql

SELECT * FROM "ctas_nation$properties";
Expand All @@ -469,7 +466,6 @@ as a part of a SQL query by appending name to the table.
``$history`` Table
^^^^^^^^^^^^^^^^^^
* ``$history`` : History of table state changes

.. code-block:: sql

SELECT * FROM "ctas_nation$history";
Expand All @@ -483,7 +479,6 @@ as a part of a SQL query by appending name to the table.
``$snapshots`` Table
^^^^^^^^^^^^^^^^^^^^
* ``$snapshots`` : Details about the table snapshots. For more information see `Snapshots <https://iceberg.apache.org/spec/#snapshots>`_ in the Iceberg Table Spec.

.. code-block:: sql

SELECT * FROM "ctas_nation$snapshots";
Expand All @@ -497,7 +492,6 @@ as a part of a SQL query by appending name to the table.
``$manifests`` Table
^^^^^^^^^^^^^^^^^^^^
* ``$manifests`` : Details about the manifests of different table snapshots. For more information see `Manifests <https://iceberg.apache.org/spec/#manifests>`_ in the Iceberg Table Spec.

.. code-block:: sql

SELECT * FROM "ctas_nation$manifests";
Expand All @@ -511,7 +505,6 @@ as a part of a SQL query by appending name to the table.
``$partitions`` Table
^^^^^^^^^^^^^^^^^^^^^
* ``$partitions`` : Detailed partition information for the table

.. code-block:: sql

SELECT * FROM "ctas_nation$partitions";
Expand All @@ -525,7 +518,6 @@ as a part of a SQL query by appending name to the table.
``$files`` Table
^^^^^^^^^^^^^^^^
* ``$files`` : Overview of data files in the current snapshot of the table

.. code-block:: sql

SELECT * FROM "ctas_nation$files";
Expand Down Expand Up @@ -862,7 +854,7 @@ Drop the schema ``iceberg.web``::
DROP SCHEMA iceberg.web

Register table
^^^^^^^^^^^^^^
^^^^^^^^^^^^

Iceberg tables for which table data and metadata already exist in the
file system can be registered with the catalog using the ``register_table``
Expand Down Expand Up @@ -894,7 +886,7 @@ in the case where a specific metadata file contains the targeted table state::
using the Hive connector will fail.

Unregister table
^^^^^^^^^^^^^^^^
^^^^^^^^^^^^

Iceberg tables can be unregistered from the catalog using the ``unregister_table``
procedure on the catalog's ``system`` schema::
Expand Down Expand Up @@ -1168,8 +1160,8 @@ Type mapping
------------

PrestoDB and Iceberg have data types not supported by the other. When using Iceberg to read or write data, Presto changes
each Iceberg data type to the corresponding Presto data type, and from each Presto data type to the comparable Iceberg data type.
The following tables detail the specific type maps between PrestoDB and Iceberg.
each Iceberg data type to the corresponding Presto data type, and from each Presto data type to the comparable Iceberg data type.
The following tables detail the specific type maps between PrestoDB and Iceberg.

Iceberg to PrestoDB type mapping
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -1257,3 +1249,45 @@ Map of PrestoDB types to the relevant Iceberg types:
- ``TIMESTAMP WITH ZONE``

No other types are supported.


Sorted Tables
^^^^^^^^^^^^^

The Iceberg connector supports the creation of sorted tables.
Data in the Iceberg table is sorted during the writing process within each file.

Sorted Iceberg tables can provide a huge increase in performance in query times.
Sorting is particularly beneficial when the sorted columns show a
high cardinality and are used as a filter for selective reads.

Configure sort order with the ``sorted_by`` table property to specify an array of
one or more columns to use for sorting.
The following example creates the table with the ``sorted_by`` property, and sorts the file based
on the field ``join_date``.

.. code-block:: text

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
sorted_by = ARRAY['join_date']
)

Sorting can be combined with partitioning on the same column. For example::

CREATE TABLE emp.employees.employee (
emp_id BIGINT,
emp_name VARCHAR,
join_date DATE,
country VARCHAR)
WITH (
partitioning = ARRAY['month(join_date)'],
sorted_by = ARRAY['join_date']
)

To disable the sorted writing, set the session property
``sorted_writing_enabled`` to ``false``.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.airlift.configuration.LegacyConfig;
import com.facebook.drift.transport.netty.codec.Protocol;
import com.facebook.presto.hive.s3.S3FileSystemType;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
Expand All @@ -30,7 +31,6 @@

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -45,6 +45,7 @@
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior.OVERWRITE;
import static com.facebook.presto.hive.HiveSessionProperties.INSERT_EXISTING_PARTITIONS_BEHAVIOR;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class HiveClientConfig
private int splitLoaderConcurrency = 4;
private DataSize maxInitialSplitSize;
private int domainCompactionThreshold = 100;
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
private NodeSelectionStrategy nodeSelectionStrategy = NO_PREFERENCE;
private boolean recursiveDirWalkerEnabled;

private int maxConcurrentFileRenames = 20;
Expand All @@ -101,7 +102,6 @@ public class HiveClientConfig
private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true;
private InsertExistingPartitionsBehavior insertExistingPartitionsBehavior;
private int maxPartitionsPerWriter = 100;
private int maxOpenSortFiles = 50;
private int writeValidationThreads = 16;

private List<String> resourceConfigFiles = ImmutableList.of();
Expand Down Expand Up @@ -272,17 +272,15 @@ public HiveClientConfig setDomainCompactionThreshold(int domainCompactionThresho
return this;
}

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
public NodeSelectionStrategy getNodeSelectionStrategy()
{
return writerSortBufferSize;
return nodeSelectionStrategy;
}

@Config("hive.writer-sort-buffer-size")
public HiveClientConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
@Config("hive.node-selection-strategy")
public HiveClientConfig setNodeSelectionStrategy(NodeSelectionStrategy nodeSelectionStrategy)
{
this.writerSortBufferSize = writerSortBufferSize;
this.nodeSelectionStrategy = nodeSelectionStrategy;
return this;
}

Expand Down Expand Up @@ -688,22 +686,6 @@ public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("hive.max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}

public int getWriteValidationThreads()
{
return writeValidationThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void configure(Binder binder)
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, DynamicConfigurationProvider.class);
configBinder(binder).bindConfig(HiveClientConfig.class);

configBinder(binder).bindConfig(SortingFileWriterConfig.class, "hive");
binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public HivePageSinkProvider(
TypeManager typeManager,
HiveClientConfig hiveClientConfig,
MetastoreClientConfig metastoreClientConfig,
SortingFileWriterConfig sortingFileWriterConfig,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
Expand All @@ -110,8 +111,8 @@ public HivePageSinkProvider(
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.maxOpenPartitions = hiveClientConfig.getMaxPartitionsPerWriter();
this.maxOpenSortFiles = hiveClientConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(hiveClientConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.immutablePartitions = hiveClientConfig.isImmutablePartitions();
this.locationService = requireNonNull(locationService, "locationService is null");
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class SortingFileWriterConfig
{
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
private int maxOpenSortFiles = 50;

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
{
return writerSortBufferSize;
}

@Config("writer-sort-buffer-size")
public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
{
this.writerSortBufferSize = writerSortBufferSize;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
FUNCTION_AND_TYPE_MANAGER,
getHiveClientConfig(),
getMetastoreClientConfig(),
getSortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand All @@ -1097,8 +1098,6 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
protected HiveClientConfig getHiveClientConfig()
{
return new HiveClientConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE))
.setTemporaryTableSchema(database)
.setCreateEmptyBucketFilesForTemporaryTable(false);
}
Expand All @@ -1108,6 +1107,12 @@ protected HiveCommonClientConfig getHiveCommonClientConfig()
return new HiveCommonClientConfig();
}

protected SortingFileWriterConfig getSortingFileWriterConfig()
{
return new SortingFileWriterConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE));
}
protected CacheConfig getCacheConfig()
{
return new CacheConfig().setCacheQuotaScope(CACHE_SCOPE).setDefaultCacheQuota(DEFAULT_QUOTA_SIZE);
Expand Down Expand Up @@ -3080,7 +3085,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath
Set<String> files = listAllDataFiles(context, path);
assertThat(listAllDataFiles(context, path))
.filteredOn(file -> file.contains(".tmp-sort"))
.size().isGreaterThan(bucketCount * getHiveClientConfig().getMaxOpenSortFiles() * 2);
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);

// finish the write
Collection<Slice> fragments = getFutureValue(sink.finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
FUNCTION_AND_TYPE_MANAGER,
config,
metastoreClientConfig,
new SortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
Expand Down
Loading

0 comments on commit 30ac341

Please sign in to comment.