From 7dc619f1199ae74f34b353e2b9d9b304bd2cf25c Mon Sep 17 00:00:00 2001 From: Nidhin Varghese Date: Tue, 20 Feb 2024 13:57:48 +0530 Subject: [PATCH] Add Support for Iceberg table sort orders --- .../src/main/sphinx/connector/iceberg.rst | 44 +++++- .../presto/hive/HiveClientConfig.java | 34 +---- .../presto/hive/HiveClientModule.java | 2 +- .../presto/hive/HivePageSinkProvider.java | 5 +- .../presto/hive/SortingFileWriterConfig.java | 60 ++++++++ .../presto/hive/AbstractTestHiveClient.java | 11 +- .../hive/AbstractTestHiveFileSystem.java | 1 + .../presto/hive/TestHiveClientConfig.java | 11 +- .../presto/hive/TestHivePageSink.java | 12 +- .../hive/TestSortingFileWriterConfig.java | 50 +++++++ .../iceberg/IcebergAbstractMetadata.java | 42 +++++- .../presto/iceberg/IcebergCommonModule.java | 7 + .../presto/iceberg/IcebergConfig.java | 14 ++ .../presto/iceberg/IcebergErrorCode.java | 4 +- .../presto/iceberg/IcebergHiveMetadata.java | 10 +- .../presto/iceberg/IcebergNativeMetadata.java | 5 +- .../presto/iceberg/IcebergPageSink.java | 107 ++++++++++++- .../iceberg/IcebergPageSinkProvider.java | 31 +++- .../iceberg/IcebergSessionProperties.java | 11 ++ .../iceberg/IcebergSortingFileWriter.java | 115 ++++++++++++++ .../presto/iceberg/IcebergTableHandle.java | 16 +- .../iceberg/IcebergTableProperties.java | 18 +++ .../facebook/presto/iceberg/IcebergUtil.java | 12 ++ .../iceberg/IcebergWritableTableHandle.java | 11 +- .../InternalIcebergConnectorFactory.java | 2 + .../presto/iceberg/PartitionFields.java | 33 +++++ .../facebook/presto/iceberg/SortField.java | 110 ++++++++++++++ .../presto/iceberg/SortFieldUtils.java | 140 ++++++++++++++++++ .../IcebergEqualityDeleteAsJoin.java | 6 +- .../iceberg/IcebergDistributedTestBase.java | 140 ++++++++++++++++++ .../presto/iceberg/TestIcebergConfig.java | 7 +- .../presto/iceberg/TestSortFieldUtils.java | 140 ++++++++++++++++++ .../presto/spi/StandardErrorCode.java | 3 +- 33 files changed, 1143 insertions(+), 71 deletions(-) create mode 100644 presto-hive/src/main/java/com/facebook/presto/hive/SortingFileWriterConfig.java create mode 100644 presto-hive/src/test/java/com/facebook/presto/hive/TestSortingFileWriterConfig.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSortingFileWriter.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortField.java create mode 100644 presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortFieldUtils.java create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSortFieldUtils.java diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index d33f73d0a5936..0f06597650b4e 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -227,7 +227,7 @@ Property Name Description Example: ``hdfs://nn:8020/warehouse/path`` This property is required if the ``iceberg.catalog.type`` is - ``hadoop``. + ``hadoop``. Otherwise, it will be ignored. ``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10`` required if the ``iceberg.catalog.type`` is ``hadoop``. @@ -1531,3 +1531,45 @@ Map of PrestoDB types to the relevant Iceberg types: - ``TIMESTAMP WITH ZONE`` No other types are supported. + + +Sorted Tables +^^^^^^^^^^^^^ + +The Iceberg connector supports the creation of sorted tables. +Data in the Iceberg table is sorted during the writing process within each file. + +Sorted Iceberg tables can provide a huge increase in performance in query times. +Sorting is particularly beneficial when the sorted columns show a +high cardinality and are used as a filter for selective reads. + +Configure sort order with the ``sorted_by`` table property to specify an array of +one or more columns to use for sorting. +The following example creates the table with the ``sorted_by`` property, and sorts the file based +on the field ``join_date``. + +.. code-block:: text + + CREATE TABLE emp.employees.employee ( + emp_id BIGINT, + emp_name VARCHAR, + join_date DATE, + country VARCHAR) + WITH ( + sorted_by = ARRAY['join_date'] + ) + +Sorting can be combined with partitioning on the same column. For example:: + + CREATE TABLE emp.employees.employee ( + emp_id BIGINT, + emp_name VARCHAR, + join_date DATE, + country VARCHAR) + WITH ( + partitioning = ARRAY['month(join_date)'], + sorted_by = ARRAY['join_date'] + ) + +To disable the sorted writing, set the session property +``sorted_writing_enabled`` to ``false``. \ No newline at end of file diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 0c8eaddda2809..aeb085487c2f3 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -19,6 +19,7 @@ import com.facebook.airlift.configuration.LegacyConfig; import com.facebook.drift.transport.netty.codec.Protocol; import com.facebook.presto.hive.s3.S3FileSystemType; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; @@ -30,7 +31,6 @@ import javax.validation.constraints.DecimalMax; import javax.validation.constraints.DecimalMin; -import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -45,6 +45,7 @@ import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior.OVERWRITE; import static com.facebook.presto.hive.HiveSessionProperties.INSERT_EXISTING_PARTITIONS_BEHAVIOR; import static com.facebook.presto.hive.HiveStorageFormat.ORC; +import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; @@ -75,7 +76,7 @@ public class HiveClientConfig private int splitLoaderConcurrency = 4; private DataSize maxInitialSplitSize; private int domainCompactionThreshold = 100; - private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE); + private NodeSelectionStrategy nodeSelectionStrategy = NO_PREFERENCE; private boolean recursiveDirWalkerEnabled; private int maxConcurrentFileRenames = 20; @@ -101,7 +102,6 @@ public class HiveClientConfig private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true; private InsertExistingPartitionsBehavior insertExistingPartitionsBehavior; private int maxPartitionsPerWriter = 100; - private int maxOpenSortFiles = 50; private int writeValidationThreads = 16; private List resourceConfigFiles = ImmutableList.of(); @@ -278,17 +278,15 @@ public HiveClientConfig setDomainCompactionThreshold(int domainCompactionThresho return this; } - @MinDataSize("1MB") - @MaxDataSize("1GB") - public DataSize getWriterSortBufferSize() + public NodeSelectionStrategy getNodeSelectionStrategy() { - return writerSortBufferSize; + return nodeSelectionStrategy; } - @Config("hive.writer-sort-buffer-size") - public HiveClientConfig setWriterSortBufferSize(DataSize writerSortBufferSize) + @Config("hive.node-selection-strategy") + public HiveClientConfig setNodeSelectionStrategy(NodeSelectionStrategy nodeSelectionStrategy) { - this.writerSortBufferSize = writerSortBufferSize; + this.nodeSelectionStrategy = nodeSelectionStrategy; return this; } @@ -694,22 +692,6 @@ public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter) this.maxPartitionsPerWriter = maxPartitionsPerWriter; return this; } - - @Min(2) - @Max(1000) - public int getMaxOpenSortFiles() - { - return maxOpenSortFiles; - } - - @Config("hive.max-open-sort-files") - @ConfigDescription("Maximum number of writer temporary files to read in one pass") - public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles) - { - this.maxOpenSortFiles = maxOpenSortFiles; - return this; - } - public int getWriteValidationThreads() { return writeValidationThreads; diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java index 716d067b9cfc8..ecbb9e71f9314 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java @@ -129,7 +129,7 @@ public void configure(Binder binder) binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON); newSetBinder(binder, DynamicConfigurationProvider.class); configBinder(binder).bindConfig(HiveClientConfig.class); - + configBinder(binder).bindConfig(SortingFileWriterConfig.class, "hive"); binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON); binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON); binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java index 6b11ce79318a0..49787627b609b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java @@ -91,6 +91,7 @@ public HivePageSinkProvider( TypeManager typeManager, HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig, + SortingFileWriterConfig sortingFileWriterConfig, LocationService locationService, JsonCodec partitionUpdateCodec, SmileCodec partitionUpdateSmileCodec, @@ -110,8 +111,8 @@ public HivePageSinkProvider( this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.maxOpenPartitions = hiveClientConfig.getMaxPartitionsPerWriter(); - this.maxOpenSortFiles = hiveClientConfig.getMaxOpenSortFiles(); - this.writerSortBufferSize = requireNonNull(hiveClientConfig.getWriterSortBufferSize(), "writerSortBufferSize is null"); + this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles(); + this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null"); this.immutablePartitions = hiveClientConfig.isImmutablePartitions(); this.locationService = requireNonNull(locationService, "locationService is null"); this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s"))); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/SortingFileWriterConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/SortingFileWriterConfig.java new file mode 100644 index 0000000000000..5dd30a7933719 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/hive/SortingFileWriterConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; +import io.airlift.units.DataSize; +import io.airlift.units.MaxDataSize; +import io.airlift.units.MinDataSize; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; + +public class SortingFileWriterConfig +{ + private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE); + private int maxOpenSortFiles = 50; + + @MinDataSize("1MB") + @MaxDataSize("1GB") + public DataSize getWriterSortBufferSize() + { + return writerSortBufferSize; + } + + @Config("writer-sort-buffer-size") + public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize) + { + this.writerSortBufferSize = writerSortBufferSize; + return this; + } + + @Min(2) + @Max(1000) + public int getMaxOpenSortFiles() + { + return maxOpenSortFiles; + } + + @Config("max-open-sort-files") + @ConfigDescription("Maximum number of writer temporary files to read in one pass") + public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles) + { + this.maxOpenSortFiles = maxOpenSortFiles; + return this; + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index 6d36b4863cf35..0fd93782915bc 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -1073,6 +1073,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi FUNCTION_AND_TYPE_MANAGER, getHiveClientConfig(), getMetastoreClientConfig(), + getSortingFileWriterConfig(), locationService, HiveTestUtils.PARTITION_UPDATE_CODEC, HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC, @@ -1099,8 +1100,6 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi protected HiveClientConfig getHiveClientConfig() { return new HiveClientConfig() - .setMaxOpenSortFiles(10) - .setWriterSortBufferSize(new DataSize(100, KILOBYTE)) .setTemporaryTableSchema(database) .setCreateEmptyBucketFilesForTemporaryTable(false); } @@ -1110,6 +1109,12 @@ protected HiveCommonClientConfig getHiveCommonClientConfig() return new HiveCommonClientConfig(); } + protected SortingFileWriterConfig getSortingFileWriterConfig() + { + return new SortingFileWriterConfig() + .setMaxOpenSortFiles(10) + .setWriterSortBufferSize(new DataSize(100, KILOBYTE)); + } protected CacheConfig getCacheConfig() { return new CacheConfig().setCacheQuotaScope(CACHE_SCOPE).setDefaultCacheQuota(DEFAULT_QUOTA_SIZE); @@ -3109,7 +3114,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath true); assertThat(listAllDataFiles(context, path)) .filteredOn(file -> file.contains(".tmp-sort")) - .size().isGreaterThan(bucketCount * getHiveClientConfig().getMaxOpenSortFiles() * 2); + .size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2); // finish the write Collection fragments = getFutureValue(sink.finish()); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java index a692ba3d9ee44..657f620d19ea2 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java @@ -254,6 +254,7 @@ protected void setup(String host, int port, String databaseName, BiFunction length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength)); } } @@ -152,11 +153,11 @@ private static String makeFileName(File tempDir, HiveClientConfig config) return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getCompressionCodec().name(); } - private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath) + private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath, SortingFileWriterConfig sortingFileWriterConfig) { HiveTransactionHandle transaction = new HiveTransactionHandle(); HiveWriterStats stats = new HiveWriterStats(); - ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats); + ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats, sortingFileWriterConfig); List columns = getTestColumns(); List columnTypes = columns.stream() .map(LineItemColumn::getType) @@ -308,7 +309,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa return provider.createPageSource(transaction, getSession(config, new HiveCommonClientConfig()), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), NON_CACHEABLE, new RuntimeStats()); } - private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats) + private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats, SortingFileWriterConfig sortingFileWriterConfig) { LocationHandle locationHandle = new LocationHandle(outputPath, outputPath, Optional.empty(), NEW, DIRECT_TO_TARGET_NEW_DIRECTORY); HiveOutputTableHandle handle = new HiveOutputTableHandle( @@ -337,6 +338,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio FUNCTION_AND_TYPE_MANAGER, config, metastoreClientConfig, + sortingFileWriterConfig, new HiveLocationService(hdfsEnvironment), HiveTestUtils.PARTITION_UPDATE_CODEC, HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC, diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestSortingFileWriterConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestSortingFileWriterConfig.java new file mode 100644 index 0000000000000..b1da10e5467f8 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestSortingFileWriterConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import org.testng.annotations.Test; + +import java.util.Map; + +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; + +public class TestSortingFileWriterConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(SortingFileWriterConfig.class) + .setWriterSortBufferSize(new DataSize(64, MEGABYTE)) + .setMaxOpenSortFiles(50)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("writer-sort-buffer-size", "1GB") + .put("max-open-sort-files", "3") + .build(); + SortingFileWriterConfig expected = new SortingFileWriterConfig() + .setWriterSortBufferSize(new DataSize(1, GIGABYTE)) + .setMaxOpenSortFiles(3); + assertFullMapping(properties, expected); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 75b6cdebe6c70..7fee550f8a901 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -78,6 +78,7 @@ import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.types.Type; @@ -119,6 +120,7 @@ import static com.facebook.presto.iceberg.IcebergTableProperties.FORMAT_VERSION; import static com.facebook.presto.iceberg.IcebergTableProperties.LOCATION_PROPERTY; import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; +import static com.facebook.presto.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES; @@ -131,6 +133,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData; import static com.facebook.presto.iceberg.IcebergUtil.getPartitions; import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator; +import static com.facebook.presto.iceberg.IcebergUtil.getSortFields; import static com.facebook.presto.iceberg.IcebergUtil.getTableComment; import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName; import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns; @@ -142,6 +145,7 @@ import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName; import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm; import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields; +import static com.facebook.presto.iceberg.SortFieldUtils.toSortFields; import static com.facebook.presto.iceberg.TableStatisticsMaker.getSupportedColumnStatistics; import static com.facebook.presto.iceberg.TypeConverter.toIcebergType; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; @@ -166,9 +170,9 @@ public abstract class IcebergAbstractMetadata { protected final TypeManager typeManager; protected final JsonCodec commitTaskCodec; + protected Transaction transaction; protected final NodeVersion nodeVersion; protected final RowExpressionService rowExpressionService; - protected Transaction transaction; private final StandardFunctionResolution functionResolution; private final ConcurrentMap icebergTables = new ConcurrentHashMap<>(); @@ -442,7 +446,32 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(IcebergTableHandle getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), icebergTable.location(), getFileFormat(icebergTable), - icebergTable.properties()); + icebergTable.properties(), + getSupportedSortFields(icebergTable.schema(), icebergTable.sortOrder())); + } + + public static List getSupportedSortFields(Schema schema, SortOrder sortOrder) + { + if (!sortOrder.isSorted()) { + return ImmutableList.of(); + } + Set baseColumnFieldIds = schema.columns().stream() + .map(Types.NestedField::fieldId) + .collect(toImmutableSet()); + + ImmutableList.Builder sortFields = ImmutableList.builder(); + for (org.apache.iceberg.SortField sortField : sortOrder.fields()) { + if (!sortField.transform().isIdentity()) { + continue; + } + if (!baseColumnFieldIds.contains(sortField.sourceId())) { + continue; + } + + sortFields.add(SortField.fromIceberg(sortField)); + } + + return sortFields.build(); } @Override @@ -546,6 +575,12 @@ protected ImmutableMap createMetadataProperties(Table icebergTab properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable)); + SortOrder sortOrder = icebergTable.sortOrder(); + // TODO: Support sort column transforms (https://github.com/trinodb/trino/issues/15088) + if (sortOrder.isSorted() && sortOrder.fields().stream().allMatch(sortField -> sortField.transform().isIdentity())) { + List sortColumnNames = toSortFields(sortOrder); + properties.put(SORTED_BY_PROPERTY, sortColumnNames); + } return properties.build(); } @@ -757,7 +792,8 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa tryGetProperties(table), tableSchemaJson, Optional.empty(), - Optional.empty()); + Optional.empty(), + getSortFields(table)); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index ba6db6eec5dfa..f70994f261f93 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -32,8 +32,12 @@ import com.facebook.presto.hive.HiveNodePartitioningProvider; import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.OrcFileWriterConfig; +import com.facebook.presto.hive.OrcFileWriterFactory; import com.facebook.presto.hive.ParquetFileWriterConfig; +import com.facebook.presto.hive.SortingFileWriterConfig; import com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration; +import com.facebook.presto.hive.datasink.DataSinkFactory; +import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory; import com.facebook.presto.hive.gcs.GcsConfigurationInitializer; import com.facebook.presto.hive.gcs.HiveGcsConfig; import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; @@ -119,6 +123,7 @@ public void setup(Binder binder) binder.bind(HdfsConfiguration.class).annotatedWith(ForMetastoreHdfsEnvironment.class).to(HiveCachingHdfsConfiguration.class).in(Scopes.SINGLETON); binder.bind(HdfsConfiguration.class).annotatedWith(ForCachingFileSystem.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(SortingFileWriterConfig.class, "iceberg"); binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON); newSetBinder(binder, DynamicConfigurationProvider.class); @@ -136,6 +141,8 @@ public void setup(Binder binder) binder.bind(ConnectorSplitManager.class).to(IcebergSplitManager.class).in(Scopes.SINGLETON); newExporter(binder).export(ConnectorSplitManager.class).as(generatedNameOf(IcebergSplitManager.class, connectorId)); binder.bind(ConnectorPageSourceProvider.class).to(IcebergPageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(DataSinkFactory.class).to(OutputStreamDataSinkFactory.class).in(Scopes.SINGLETON); + binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(IcebergPageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index e49832a64ec9b..6eb9791c7fb7c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -52,6 +52,7 @@ public class IcebergConfig private double statisticSnapshotRecordDifferenceWeight; private boolean pushdownFilterEnabled; private boolean deleteAsJoinRewriteEnabled = true; + private boolean sortedWritingEnabled = true; private int rowsForMetadataOptimizationThreshold = 1000; private EnumSet hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class); @@ -349,4 +350,17 @@ public IcebergConfig setSplitManagerThreads(int splitManagerThreads) this.splitManagerThreads = splitManagerThreads; return this; } + + public boolean isSortedWritingEnabled() + { + return sortedWritingEnabled; + } + + @Config("iceberg.sorted-writing-enabled") + @ConfigDescription("Enable sorted writing to tables with a specified sort order") + public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled) + { + this.sortedWritingEnabled = sortedWritingEnabled; + return this; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java index 36dda85989ea4..bbf4c06ecd218 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergErrorCode.java @@ -33,11 +33,13 @@ public enum IcebergErrorCode ICEBERG_CANNOT_OPEN_SPLIT(6, EXTERNAL), ICEBERG_WRITER_OPEN_ERROR(7, EXTERNAL), ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL), + ICEBERG_CURSOR_ERROR(9, EXTERNAL), ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR), ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR), ICEBERG_INVALID_TABLE_TIMESTAMP(12, USER_ERROR), ICEBERG_ROLLBACK_ERROR(13, EXTERNAL), - ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR); + ICEBERG_INVALID_FORMAT_VERSION(14, USER_ERROR), + ICEBERG_COMMIT_ERROR(15, EXTERNAL); private final ErrorCode errorCode; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 26652ab35128b..06efa8150ac6e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -72,6 +72,7 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; @@ -113,6 +114,7 @@ import static com.facebook.presto.iceberg.IcebergSessionProperties.getHiveStatisticsMergeStrategy; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning; +import static com.facebook.presto.iceberg.IcebergTableProperties.getSortOrder; import static com.facebook.presto.iceberg.IcebergTableProperties.getTableLocation; import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergUtil.createIcebergViewProperties; @@ -124,6 +126,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties; import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; +import static com.facebook.presto.iceberg.SortFieldUtils.parseSortFields; import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateAndSetTableSize; import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics; import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; @@ -303,9 +306,9 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con if (operations.current() != null) { throw new TableAlreadyExistsException(schemaTableName); } - + SortOrder sortOrder = parseSortFields(schema, getSortOrder(tableMetadata.getProperties())); FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); - TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat, session)); + TableMetadata metadata = newTableMetadata(schema, partitionSpec, sortOrder, targetPath, populateTableProperties(tableMetadata, fileFormat, session)); transaction = createTableTransaction(tableName, operations, metadata); return new IcebergWritableTableHandle( @@ -316,7 +319,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con getColumns(metadata.schema(), metadata.spec(), typeManager), targetPath, fileFormat, - metadata.properties()); + metadata.properties(), + getSupportedSortFields(metadata.schema(), sortOrder)); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 598f8427ff745..ee9e4610dfbb0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -57,6 +57,7 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -187,7 +188,9 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), icebergTable.location(), fileFormat, - icebergTable.properties()); + icebergTable.properties(), + transaction.replaceSortOrder().apply().fields().stream().map(SortField::fromIceberg) + .collect(toImmutableList())); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java index 9184a46e13359..ebf1da9888819 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java @@ -17,6 +17,7 @@ import com.facebook.presto.common.Page; import com.facebook.presto.common.block.Block; import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.block.SortOrder; import com.facebook.presto.common.function.SqlFunctionProperties; import com.facebook.presto.common.type.BigintType; import com.facebook.presto.common.type.BooleanType; @@ -30,18 +31,23 @@ import com.facebook.presto.common.type.TimestampType; import com.facebook.presto.common.type.TinyintType; import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.common.type.VarbinaryType; import com.facebook.presto.common.type.VarcharType; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.OrcFileWriterFactory; import com.facebook.presto.iceberg.PartitionTransforms.ColumnTransform; import com.facebook.presto.spi.ConnectorPageSink; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PageIndexer; import com.facebook.presto.spi.PageIndexerFactory; +import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; +import io.airlift.units.DataSize; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.iceberg.MetricsConfig; @@ -49,7 +55,9 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.types.Types; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -64,8 +72,12 @@ import java.util.function.Function; import static com.facebook.presto.common.type.Decimals.readBigDecimal; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS; +import static com.facebook.presto.iceberg.IcebergSessionProperties.isSortedWritingEnabled; +import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.PartitionTransforms.getColumnTransform; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; @@ -106,6 +118,18 @@ public class IcebergPageSink private long systemMemoryUsage; private long validationCpuNanos; + private final List sortOrder; + private final boolean sortedWritingEnabled; + private final DataSize sortingFileWriterBufferSize; + private final Integer sortingFileWriterMaxOpenFiles; + private final Path tempDirectory; + private final TypeManager typeManager; + private final PageSorter pageSorter; + private final List columnTypes; + private final List sortColumnIndexes; + private final List sortOrders; + private final OrcFileWriterFactory orcFileWriterFactory; + public IcebergPageSink( Schema outputSchema, PartitionSpec partitionSpec, @@ -118,7 +142,13 @@ public IcebergPageSink( JsonCodec jsonCodec, ConnectorSession session, FileFormat fileFormat, - int maxOpenWriters) + int maxOpenWriters, + List sortOrder, + DataSize sortingFileWriterBufferSize, + int sortingFileWriterMaxOpenFiles, + TypeManager typeManager, + PageSorter pageSorter, + OrcFileWriterFactory orcFileWriterFactory) { requireNonNull(inputColumns, "inputColumns is null"); this.outputSchema = requireNonNull(outputSchema, "outputSchema is null"); @@ -135,6 +165,37 @@ public IcebergPageSink( this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec), session); + this.sortOrder = requireNonNull(sortOrder, "sortOrder is null"); + this.sortedWritingEnabled = isSortedWritingEnabled(session); + this.sortingFileWriterBufferSize = requireNonNull(sortingFileWriterBufferSize, "sortingFileWriterBufferSize is null"); + this.sortingFileWriterMaxOpenFiles = sortingFileWriterMaxOpenFiles; + String tempDirectoryPath = locationProvider.newDataLocation("sort-tmp-files"); + this.tempDirectory = new Path(tempDirectoryPath); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); + this.columnTypes = getColumns(outputSchema, partitionSpec, typeManager).stream() + .map(IcebergColumnHandle::getType) + .collect(toImmutableList()); + this.orcFileWriterFactory = requireNonNull(orcFileWriterFactory, "orcFileWriterFactory is null"); + + if (sortedWritingEnabled) { + ImmutableList.Builder sortColumnIndexes = ImmutableList.builder(); + ImmutableList.Builder sortOrders = ImmutableList.builder(); + for (SortField sortField : sortOrder) { + Types.NestedField column = outputSchema.findField(sortField.getSourceColumnId()); + if (column == null) { + throw new PrestoException(ICEBERG_INVALID_METADATA, "Unable to find sort field source column in the table schema: " + sortField); + } + sortColumnIndexes.add(outputSchema.columns().indexOf(column)); + sortOrders.add(sortField.getSortOrder()); + } + this.sortColumnIndexes = sortColumnIndexes.build(); + this.sortOrders = sortOrders.build(); + } + else { + this.sortColumnIndexes = ImmutableList.of(); + this.sortOrders = ImmutableList.of(); + } } @Override @@ -294,12 +355,46 @@ private int[] getWriterIndexes(Page page) Page transformedPage = pagePartitioner.getTransformedPage(); for (int position = 0; position < page.getPositionCount(); position++) { int writerIndex = writerIndexes[position]; - if (writers.get(writerIndex) != null) { + WriteContext writer = writers.get(writerIndex); + if (writer != null) { continue; } Optional partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position); - WriteContext writer = createWriter(partitionData); + + String fileName = fileFormat.addExtension(randomUUID().toString()); + Path outputPath = partitionData.map(partition -> new Path(locationProvider.newDataLocation(partitionSpec, partition, fileName))) + .orElse(new Path(locationProvider.newDataLocation(fileName))); + + FileSystem fileSystem; + try { + fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), outputPath, jobConf); + } + catch (IOException e) { + throw new PrestoException(HIVE_WRITER_OPEN_ERROR, e); + } + + if (!sortOrder.isEmpty() && sortedWritingEnabled) { + Path tempFilePrefix = new Path(tempDirectory, format("sorting-file-writer-%s-%s", session.getQueryId(), randomUUID())); + WriteContext writerContext = createWriter(partitionData, outputPath); + IcebergFileWriter sortedFileWriter = new IcebergSortingFileWriter( + fileSystem, + tempFilePrefix, + writerContext.getWriter(), + sortingFileWriterBufferSize, + sortingFileWriterMaxOpenFiles, + columnTypes, + sortColumnIndexes, + sortOrders, + pageSorter, + false, + session, + orcFileWriterFactory); + writer = new WriteContext(sortedFileWriter, outputPath, partitionData); + } + else { + writer = createWriter(partitionData, outputPath); + } writers.set(writerIndex, writer); } @@ -309,12 +404,8 @@ private int[] getWriterIndexes(Page page) return writerIndexes; } - private WriteContext createWriter(Optional partitionData) + private WriteContext createWriter(Optional partitionData, Path outputPath) { - String fileName = fileFormat.addExtension(randomUUID().toString()); - Path outputPath = partitionData.map(partition -> new Path(locationProvider.newDataLocation(partitionSpec, partition, fileName))) - .orElse(new Path(locationProvider.newDataLocation(fileName))); - IcebergFileWriter writer = fileWriterFactory.createFileWriter( outputPath, outputSchema, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java index 7e91ebf4a7028..4b41f38a67a22 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSinkProvider.java @@ -14,17 +14,22 @@ package com.facebook.presto.iceberg; import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.OrcFileWriterFactory; +import com.facebook.presto.hive.SortingFileWriterConfig; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PageIndexerFactory; import com.facebook.presto.spi.PageSinkContext; +import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.connector.ConnectorPageSinkProvider; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import io.airlift.units.DataSize; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; @@ -45,13 +50,24 @@ public class IcebergPageSinkProvider private final PageIndexerFactory pageIndexerFactory; private final int maxOpenPartitions; + private final DataSize sortingFileWriterBufferSize; + private final int sortingFileWriterMaxOpenFiles; + private final TypeManager typeManager; + private final PageSorter pageSorter; + + private final OrcFileWriterFactory orcFileWriterFactory; + @Inject public IcebergPageSinkProvider( HdfsEnvironment hdfsEnvironment, JsonCodec jsonCodec, IcebergFileWriterFactory fileWriterFactory, PageIndexerFactory pageIndexerFactory, - IcebergConfig icebergConfig) + IcebergConfig icebergConfig, + SortingFileWriterConfig sortingFileWriterConfig, + TypeManager typeManager, + PageSorter pageSorter, + OrcFileWriterFactory orcFileWriterFactory) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); @@ -59,6 +75,11 @@ public IcebergPageSinkProvider( this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); requireNonNull(icebergConfig, "icebergConfig is null"); this.maxOpenPartitions = icebergConfig.getMaxPartitionsPerWriter(); + this.sortingFileWriterBufferSize = sortingFileWriterConfig.getWriterSortBufferSize(); + this.sortingFileWriterMaxOpenFiles = sortingFileWriterConfig.getMaxOpenSortFiles(); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); + this.orcFileWriterFactory = requireNonNull(orcFileWriterFactory, "orcFileWriterFactory is null"); } @Override @@ -92,6 +113,12 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab jsonCodec, session, tableHandle.getFileFormat(), - maxOpenPartitions); + maxOpenPartitions, + tableHandle.getSortOrder(), + sortingFileWriterBufferSize, + sortingFileWriterMaxOpenFiles, + typeManager, + pageSorter, + orcFileWriterFactory); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index 5a597d97051b4..6c3b1ae4b1a16 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -66,6 +66,7 @@ public final class IcebergSessionProperties public static final String ROWS_FOR_METADATA_OPTIMIZATION_THRESHOLD = "rows_for_metadata_optimization_threshold"; private final List> sessionProperties; + private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled"; @Inject public IcebergSessionProperties( @@ -184,6 +185,11 @@ public IcebergSessionProperties( "of an Iceberg table exceeds this threshold, metadata optimization would be skipped for " + "the table. A value of 0 means skip metadata optimization directly.", icebergConfig.getRowsForMetadataOptimizationThreshold(), + false)) + .add(booleanProperty( + SORTED_WRITING_ENABLED, + "Enable sorted writing to tables with a specified sort order", + icebergConfig.isSortedWritingEnabled(), false)); nessieConfig.ifPresent((config) -> propertiesBuilder @@ -313,4 +319,9 @@ public static String getNessieReferenceHash(ConnectorSession session) { return session.getProperty(NESSIE_REFERENCE_HASH, String.class); } + + public static boolean isSortedWritingEnabled(ConnectorSession session) + { + return session.getProperty(SORTED_WRITING_ENABLED, Boolean.class); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSortingFileWriter.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSortingFileWriter.java new file mode 100644 index 0000000000000..796f1f36a3f81 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSortingFileWriter.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.SortOrder; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.hive.OrcFileWriterFactory; +import com.facebook.presto.hive.SortingFileWriter; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PageSorter; +import io.airlift.units.DataSize; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Metrics; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class IcebergSortingFileWriter + implements IcebergFileWriter +{ + private final IcebergFileWriter outputWriter; + private final SortingFileWriter sortingFileWriter; + + public IcebergSortingFileWriter( + FileSystem fileSystem, + Path tempFilePrefix, + IcebergFileWriter outputWriter, + DataSize maxMemory, + int maxOpenTempFiles, + List types, + List sortFields, + List sortOrders, + PageSorter pageSorter, + boolean sortedWriteToTempPathEnabled, + ConnectorSession session, + OrcFileWriterFactory orcFileWriterFactory) + { + this.outputWriter = requireNonNull(outputWriter, "outputWriter is null"); + this.sortingFileWriter = new SortingFileWriter( + fileSystem, + tempFilePrefix, + outputWriter, + maxMemory, + maxOpenTempFiles, + types, + sortFields, + sortOrders, + pageSorter, + (fs, p) -> orcFileWriterFactory.createDataSink(session, fs, p), + sortedWriteToTempPathEnabled); + } + + @Override + public Metrics getMetrics() + { + return outputWriter.getMetrics(); + } + + @Override + public long getWrittenBytes() + { + return sortingFileWriter.getWrittenBytes(); + } + + @Override + public long getSystemMemoryUsage() + { + return sortingFileWriter.getSystemMemoryUsage(); + } + + @Override + public void appendRows(Page dataPage) + { + sortingFileWriter.appendRows(dataPage); + } + + @Override + public Optional commit() + { + return sortingFileWriter.commit(); + } + + @Override + public void rollback() + { + sortingFileWriter.rollback(); + } + + @Override + public long getValidationCpuNanos() + { + return sortingFileWriter.getValidationCpuNanos(); + } + + @Override + public long getFileSizeInBytes() + { + return sortingFileWriter.getFileSizeInBytes(); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java index 6e3e8bb8467d7..9d6b2f9f4a41f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableHandle.java @@ -16,7 +16,9 @@ import com.facebook.presto.hive.BaseHiveTableHandle; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -34,6 +36,7 @@ public class IcebergTableHandle private final Optional tableSchemaJson; private final Optional> partitionFieldIds; private final Optional> equalityFieldIds; + private final List sortOrder; @JsonCreator public IcebergTableHandle( @@ -44,7 +47,8 @@ public IcebergTableHandle( @JsonProperty("storageProperties") Optional> storageProperties, @JsonProperty("tableSchemaJson") Optional tableSchemaJson, @JsonProperty("partitionFieldIds") Optional> partitionFieldIds, - @JsonProperty("equalityFieldIds") Optional> equalityFieldIds) + @JsonProperty("equalityFieldIds") Optional> equalityFieldIds, + @JsonProperty("sortOrder") List sortOrder) { super(schemaName, icebergTableName.getTableName()); @@ -55,6 +59,7 @@ public IcebergTableHandle( this.tableSchemaJson = requireNonNull(tableSchemaJson, "tableSchemaJson is null"); this.partitionFieldIds = requireNonNull(partitionFieldIds, "partitionFieldIds is null"); this.equalityFieldIds = requireNonNull(equalityFieldIds, "equalityFieldIds is null"); + this.sortOrder = ImmutableList.copyOf(requireNonNull(sortOrder, "sortOrder is null")); } @JsonProperty @@ -69,6 +74,12 @@ public boolean isSnapshotSpecified() return snapshotSpecified; } + @JsonProperty + public List getSortOrder() + { + return sortOrder; + } + @JsonProperty public Optional getTableSchemaJson() { @@ -113,6 +124,7 @@ public boolean equals(Object o) return Objects.equals(getSchemaName(), that.getSchemaName()) && Objects.equals(icebergTableName, that.icebergTableName) && snapshotSpecified == that.snapshotSpecified && + Objects.equals(sortOrder, that.sortOrder) && Objects.equals(tableSchemaJson, that.tableSchemaJson) && Objects.equals(equalityFieldIds, that.equalityFieldIds); } @@ -120,7 +132,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(getSchemaName(), icebergTableName, snapshotSpecified, tableSchemaJson, equalityFieldIds); + return Objects.hash(getSchemaName(), icebergTableName, sortOrder, snapshotSpecified, tableSchemaJson, equalityFieldIds); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java index bda438b100a8f..53ca483dd0802 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java @@ -36,6 +36,8 @@ public class IcebergTableProperties { public static final String FILE_FORMAT_PROPERTY = "format"; public static final String PARTITIONING_PROPERTY = "partitioning"; + + public static final String SORTED_BY_PROPERTY = "sorted_by"; public static final String LOCATION_PROPERTY = "location"; public static final String FORMAT_VERSION = "format_version"; public static final String COMMIT_RETRIES = "commit_retries"; @@ -74,6 +76,15 @@ public IcebergTableProperties(IcebergConfig icebergConfig) "File system location URI for the table", null, false)) + .add(new PropertyMetadata<>( + SORTED_BY_PROPERTY, + "Sorted columns", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value)) .add(stringProperty( FORMAT_VERSION, "Format version for the table", @@ -124,6 +135,13 @@ public static List getPartitioning(Map tableProperties) return partitioning == null ? ImmutableList.of() : ImmutableList.copyOf(partitioning); } + @SuppressWarnings("unchecked") + public static List getSortOrder(Map tableProperties) + { + List sortedBy = (List) tableProperties.get(SORTED_BY_PROPERTY); + return sortedBy == null ? ImmutableList.of() : ImmutableList.copyOf(sortedBy); + } + public static String getTableLocation(Map tableProperties) { return (String) tableProperties.get(LOCATION_PROPERTY); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 7ccd43b93dd5f..d6a5fe6c9fd0c 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -471,6 +471,18 @@ public static Optional tryGetLocation(Table table) } } + public static List getSortFields(Table table) + { + try { + return table.sortOrder().fields().stream() + .map(SortField::fromIceberg) + .collect(toImmutableList()); + } + catch (Exception e) { + log.warn(String.format("Unable to fetch sort fields for table %s: %s", table.name(), e.getMessage())); + return ImmutableList.of(); + } + } private static boolean isValidPartitionType(FileFormat fileFormat, Type type) { return type instanceof DecimalType || diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergWritableTableHandle.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergWritableTableHandle.java index 0ad0e163d8291..d2175ec4e51f2 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergWritableTableHandle.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergWritableTableHandle.java @@ -35,6 +35,7 @@ public class IcebergWritableTableHandle private final String outputPath; private final FileFormat fileFormat; private final Map storageProperties; + private final List sortOrder; @JsonCreator public IcebergWritableTableHandle( @@ -45,7 +46,8 @@ public IcebergWritableTableHandle( @JsonProperty("inputColumns") List inputColumns, @JsonProperty("outputPath") String outputPath, @JsonProperty("fileFormat") FileFormat fileFormat, - @JsonProperty("storageProperties") Map storageProperties) + @JsonProperty("storageProperties") Map storageProperties, + @JsonProperty("sortOrder") List sortOrder) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -55,6 +57,7 @@ public IcebergWritableTableHandle( this.outputPath = requireNonNull(outputPath, "filePrefix is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.storageProperties = requireNonNull(storageProperties, "storageProperties is null"); + this.sortOrder = ImmutableList.copyOf(requireNonNull(sortOrder, "sortOrder is null")); } @JsonProperty @@ -110,4 +113,10 @@ public String toString() { return schemaName + "." + tableName; } + + @JsonProperty + public List getSortOrder() + { + return sortOrder; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java index bab2cb74a63d3..3caa22f40f762 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java @@ -30,6 +30,7 @@ import com.facebook.presto.plugin.base.security.AllowAllAccessControl; import com.facebook.presto.spi.NodeManager; import com.facebook.presto.spi.PageIndexerFactory; +import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; import com.facebook.presto.spi.connector.Connector; import com.facebook.presto.spi.connector.ConnectorContext; @@ -93,6 +94,7 @@ public static Connector createConnector( binder.bind(NodeManager.class).toInstance(context.getNodeManager()); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); + binder.bind(PageSorter.class).toInstance(context.getPageSorter()); binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager()); binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionFields.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionFields.java index bbc47cd4adbb3..973120d54abf8 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionFields.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionFields.java @@ -27,6 +27,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.Integer.parseInt; import static java.lang.String.format; +import static java.util.Locale.ENGLISH; import static org.apache.iceberg.expressions.Expressions.bucket; import static org.apache.iceberg.expressions.Expressions.day; import static org.apache.iceberg.expressions.Expressions.hour; @@ -41,6 +42,12 @@ public final class PartitionFields private static final String FUNCTION_NAME = "\\((" + NAME + ")\\)"; private static final String FUNCTION_NAME_INT = "\\((" + NAME + "), *(\\d+)\\)"; + private static final String UNQUOTED_IDENTIFIER = "[a-zA-Z_][a-zA-Z0-9_]*"; + private static final String QUOTED_IDENTIFIER = "\"(?:\"\"|[^\"])*\""; + public static final String IDENTIFIER = "(" + UNQUOTED_IDENTIFIER + "|" + QUOTED_IDENTIFIER + ")"; + private static final Pattern UNQUOTED_IDENTIFIER_PATTERN = Pattern.compile(UNQUOTED_IDENTIFIER); + private static final Pattern QUOTED_IDENTIFIER_PATTERN = Pattern.compile(QUOTED_IDENTIFIER); + private static final Pattern IDENTITY_PATTERN = Pattern.compile(NAME); private static final Pattern YEAR_PATTERN = Pattern.compile("year" + FUNCTION_NAME); private static final Pattern MONTH_PATTERN = Pattern.compile("month" + FUNCTION_NAME); @@ -179,4 +186,30 @@ private static String toPartitionField(PartitionSpec spec, PartitionField field) throw new UnsupportedOperationException("Unsupported partition transform: " + field); } + + public static String quotedName(String name) + { + if (UNQUOTED_IDENTIFIER_PATTERN.matcher(name).matches()) { + return name; + } + return '"' + name.replace("\"", "\"\"") + '"'; + } + + public static String fromIdentifierToColumn(String identifier) + { + if (QUOTED_IDENTIFIER_PATTERN.matcher(identifier).matches()) { + // We only support lowercase quoted identifiers for now. + // See https://github.com/trinodb/trino/issues/12226#issuecomment-1128839259 + // TODO: Enhance quoted identifiers support in Iceberg partitioning to support mixed case identifiers + // See https://github.com/trinodb/trino/issues/12668 + if (!identifier.toLowerCase(ENGLISH).equals(identifier)) { + throw new IllegalArgumentException(format("Uppercase characters in identifier '%s' are not supported.", identifier)); + } + return identifier.substring(1, identifier.length() - 1).replace("\"\"", "\""); + } + // Currently, all Iceberg columns are stored in lowercase in the Iceberg metadata files. + // Unquoted identifiers are canonicalized to lowercase here which is not according ANSI SQL spec. + // See https://github.com/trinodb/trino/issues/17 + return identifier.toLowerCase(ENGLISH); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortField.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortField.java new file mode 100644 index 0000000000000..fa13b8aeca509 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortField.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.common.block.SortOrder; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class SortField +{ + private final int sourceColumnId; + private final SortOrder sortOrder; + + @JsonCreator + public SortField(@JsonProperty("sourceColumnId") int sourceColumnId, @JsonProperty("sortOrder") SortOrder sortOrder) + { + this.sourceColumnId = sourceColumnId; + this.sortOrder = requireNonNull(sortOrder, "sortOrder is null"); + } + public static SortField fromIceberg(org.apache.iceberg.SortField sortField) + { + SortOrder sortOrder; + switch (sortField.direction()) { + case ASC: + switch (sortField.nullOrder()) { + case NULLS_FIRST: + sortOrder = SortOrder.ASC_NULLS_FIRST; + break; + case NULLS_LAST: + sortOrder = SortOrder.ASC_NULLS_LAST; + break; + default: + throw new IllegalStateException("Unexpected null order: " + sortField.nullOrder()); + } + break; + case DESC: + switch (sortField.nullOrder()) { + case NULLS_FIRST: + sortOrder = SortOrder.DESC_NULLS_FIRST; + break; + case NULLS_LAST: + sortOrder = SortOrder.DESC_NULLS_LAST; + break; + default: + throw new IllegalStateException("Unexpected null order: " + sortField.nullOrder()); + } + break; + default: + throw new IllegalStateException("Unexpected sort direction: " + sortField.direction()); + } + + return new SortField(sortField.sourceId(), sortOrder); + } + + @JsonProperty + public int getSourceColumnId() + { + return sourceColumnId; + } + + @JsonProperty + public SortOrder getSortOrder() + { + return sortOrder; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SortField that = (SortField) o; + return sourceColumnId == that.sourceColumnId && sortOrder == that.sortOrder; + } + + @Override + public int hashCode() + { + return Objects.hash(sourceColumnId, sortOrder); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("sourceColumnId", sourceColumnId) + .add("sortOrder", sortOrder) + .toString(); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortFieldUtils.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortFieldUtils.java new file mode 100644 index 0000000000000..609af4f930c36 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/SortFieldUtils.java @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.spi.PrestoException; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderBuilder; +import org.apache.iceberg.types.Types; + +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.facebook.presto.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; +import static com.facebook.presto.iceberg.PartitionFields.fromIdentifierToColumn; +import static com.facebook.presto.iceberg.PartitionFields.quotedName; +import static com.facebook.presto.spi.StandardErrorCode.COLUMN_NOT_FOUND; +import static com.facebook.presto.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static java.lang.String.format; + +public class SortFieldUtils +{ + private SortFieldUtils() {} + + private static final Pattern PATTERN = Pattern.compile( + "\\s*(?" + PartitionFields.IDENTIFIER + ")" + + "(?i:\\s+(?ASC|DESC))?" + + "(?i:\\s+NULLS\\s+(?FIRST|LAST))?" + + "\\s*"); + + public static SortOrder parseSortFields(Schema schema, List fields) + { + SortOrder.Builder builder = SortOrder.builderFor(schema); + parseSortFields(builder, fields); + SortOrder sortOrder; + try { + sortOrder = builder.build(); + } + catch (RuntimeException e) { + throw new PrestoException(INVALID_TABLE_PROPERTY, "Invalid " + SORTED_BY_PROPERTY + " definition", e); + } + + Set baseColumnFieldIds = schema.columns().stream() + .map(Types.NestedField::fieldId) + .collect(toImmutableSet()); + for (SortField field : sortOrder.fields()) { + if (!baseColumnFieldIds.contains(field.sourceId())) { + throw new PrestoException(COLUMN_NOT_FOUND, "Column not found: " + schema.findColumnName(field.sourceId())); + } + } + + return sortOrder; + } + + public static void parseSortFields(SortOrderBuilder sortOrderBuilder, List fields) + { + fields.forEach(field -> parseSortField(sortOrderBuilder, field)); + } + + private static void parseSortField(SortOrderBuilder builder, String field) + { + Matcher matcher = PATTERN.matcher(field); + if (!matcher.matches()) { + throw new IllegalArgumentException(format("Unable to parse sort field: [%s]", field)); + } + + String columnName = fromIdentifierToColumn(matcher.group("identifier")); + boolean ascending; + String ordering = firstNonNull(matcher.group("ordering"), "ASC").toUpperCase(Locale.ENGLISH); + + switch (ordering) { + case "ASC": + ascending = true; + break; + case "DESC": + ascending = false; + break; + default: + throw new IllegalStateException("Unexpected ordering value: " + ordering); + } + + String nullOrderDefault = ascending ? "FIRST" : "LAST"; + + NullOrder nullOrder; + String nullOrderValue = firstNonNull(matcher.group("nullOrder"), nullOrderDefault).toUpperCase(Locale.ENGLISH); + + switch (nullOrderValue) { + case "FIRST": + nullOrder = NullOrder.NULLS_FIRST; + break; + case "LAST": + nullOrder = NullOrder.NULLS_LAST; + break; + default: + throw new IllegalStateException("Unexpected null ordering value: " + nullOrderValue); + } + + if (ascending) { + builder.asc(columnName, nullOrder); + } + else { + builder.desc(columnName, nullOrder); + } + } + + public static List toSortFields(SortOrder spec) + { + return spec.fields().stream() + .map(field -> toSortField(spec, field)) + .collect(toImmutableList()); + } + + private static String toSortField(SortOrder spec, SortField field) + { + verify(field.transform().isIdentity(), "Iceberg sort transforms are not supported"); + + String name = quotedName(spec.schema().findColumnName(field.sourceId())); + return format("%s %s %s", name, field.direction(), field.nullOrder()); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java index 98a0549c8557e..7a742519c1729 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java @@ -326,7 +326,8 @@ private TableScanNode createDeletesTableScan(ImmutableMap assignmentsBuilder = ImmutableMap.builder() diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 8fb1d6dedbb1a..92bc9a8f8a63d 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -91,6 +91,7 @@ import java.util.UUID; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -121,6 +122,7 @@ import static java.util.stream.IntStream.range; import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP; import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; @@ -1727,4 +1729,142 @@ private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount) int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP)); assertEquals(totalDeleteFiles, deleteFilesCount); } + @Test + public void testSortByAllTypes() + { + String tableName = "test_sort_by_all_types_" + randomTableSuffix(); + assertUpdate("" + + "CREATE TABLE " + tableName + " (" + + " a_boolean boolean, " + + " an_integer integer, " + + " a_bigint bigint, " + + " a_real real, " + + " a_double double, " + + " a_short_decimal decimal(5,2), " + + " a_long_decimal decimal(38,20), " + + " a_varchar varchar, " + + " a_varbinary varbinary, " + + " a_date date, " + + " a_timestamp timestamp, " + + " an_array array(varchar), " + + " a_map map(integer, varchar) " + + ") " + + "WITH (" + + "sorted_by = ARRAY[" + + " 'a_boolean', " + + " 'an_integer', " + + " 'a_bigint', " + + " 'a_real', " + + " 'a_double', " + + " 'a_short_decimal', " + + " 'a_long_decimal', " + + " 'a_varchar', " + + " 'a_varbinary', " + + " 'a_date', " + + " 'a_timestamp' " + + " ]" + + ")"); + String values = "(" + + "true, " + + "1, " + + "BIGINT '2', " + + "REAL '3.0', " + + "DOUBLE '4.0', " + + "DECIMAL '5.00', " + + "CAST(DECIMAL '6.00' AS decimal(38,20)), " + + "VARCHAR 'seven', " + + "X'88888888', " + + "DATE '2022-09-09', " + + "TIMESTAMP '2022-11-11 11:11:11.000000', " + + "ARRAY[VARCHAR 'four', 'teen'], " + + "MAP(ARRAY[15], ARRAY[VARCHAR 'fifteen']))"; + String highValues = "(" + + "true, " + + "999999999, " + + "BIGINT '999999999', " + + "REAL '999.999', " + + "DOUBLE '999.999', " + + "DECIMAL '999.99', " + + "DECIMAL '6.00', " + + "'zzzzzzzzzzzzzz', " + + "X'FFFFFFFF', " + + "DATE '2099-12-31', " + + "TIMESTAMP '2099-12-31 23:59:59.000000', " + + "ARRAY['zzzz', 'zzzz'], " + + "MAP(ARRAY[999], ARRAY['zzzz']))"; + String lowValues = "(" + + "false, " + + "0, " + + "BIGINT '0', " + + "REAL '0', " + + "DOUBLE '0', " + + "DECIMAL '0', " + + "DECIMAL '0', " + + "'', " + + "X'00000000', " + + "DATE '2000-01-01', " + + "TIMESTAMP '2000-01-01 00:00:00.000000', " + + "ARRAY['', ''], " + + "MAP(ARRAY[0], ARRAY['']))"; + + assertUpdate("INSERT INTO " + tableName + " VALUES " + values + ", " + highValues + ", " + lowValues, 3); + dropTable(getSession(), tableName); + } + + @Test + public void testEmptySortedByList() + { + String tableName = "test_empty_sorted_by_list_" + randomTableSuffix(); + assertUpdate("" + + "CREATE TABLE " + tableName + " (a_boolean boolean, an_integer integer) " + + " WITH (partitioning = ARRAY['an_integer'], sorted_by = ARRAY[])"); + dropTable(getSession(), tableName); + } + + @Test(dataProvider = "sortedTableWithQuotedIdentifierCasing") + public void testCreateSortedTableWithQuotedIdentifierCasing(String columnName, String sortField) + { + String tableName = "test_create_sorted_table_with_quotes_" + randomTableSuffix(); + assertUpdate(format("CREATE TABLE %s (%s bigint) WITH (sorted_by = ARRAY['%s'])", tableName, columnName, sortField)); + dropTable(getSession(), tableName); + } + + @DataProvider(name = "sortedTableWithQuotedIdentifierCasing") + public static Object[][] sortedTableWithQuotedIdentifierCasing() + { + return new Object[][] { + {"col", "col"}, + {"\"col\"", "col"}, + {"col", "\"col\""}, + {"\"col\"", "\"col\""}, + }; + } + + @Test(dataProvider = "sortedTableWithSortTransform") + public void testCreateSortedTableWithSortTransform(String columnName, String sortField) + { + String tableName = "test_sort_with_transform_" + randomTableSuffix(); + String query = format("CREATE TABLE %s (%s TIMESTAMP) WITH (sorted_by = ARRAY['%s'])", tableName, columnName, sortField); + assertQueryFails(query, Pattern.quote(format("Unable to parse sort field: [%s]", sortField))); + } + + @DataProvider(name = "sortedTableWithSortTransform") + public static Object[][] sortedTableWithSortTransform() + { + return new Object[][] { + {"col", "bucket(col, 3)"}, + {"col", "bucket(\"col\", 3)"}, + {"col", "truncate(col, 3)"}, + {"col", "year(col)"}, + {"col", "month(col)"}, + {"col", "date(col)"}, + {"col", "hour(col)"}, + }; + } + + protected void dropTable(Session session, String table) + { + assertUpdate(session, "DROP TABLE " + table); + assertFalse(getQueryRunner().tableExists(session, table)); + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 1bf74d006d8c5..8d4956556d656 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -60,7 +60,8 @@ public void testDefaults() .setMaxManifestCacheSize(IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) .setManifestCacheExpireDuration(IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) .setManifestCacheMaxContentLength(IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT) - .setSplitManagerThreads(Runtime.getRuntime().availableProcessors())); + .setSplitManagerThreads(Runtime.getRuntime().availableProcessors()) + .setSortedWritingEnabled(true)); } @Test @@ -88,6 +89,7 @@ public void testExplicitPropertyMappings() .put("iceberg.io.manifest.cache.expiration-interval-ms", "600000") .put("iceberg.io.manifest.cache.max-content-length", "10485760") .put("iceberg.split-manager-threads", "42") + .put("iceberg.sorted-writing-enabled", "false") .build(); IcebergConfig expected = new IcebergConfig() @@ -111,7 +113,8 @@ public void testExplicitPropertyMappings() .setMaxManifestCacheSize(1048576000) .setManifestCacheExpireDuration(600000) .setManifestCacheMaxContentLength(10485760) - .setSplitManagerThreads(42); + .setSplitManagerThreads(42) + .setSortedWritingEnabled(false); assertFullMapping(properties, expected); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSortFieldUtils.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSortFieldUtils.java new file mode 100644 index 0000000000000..99b255e1a4207 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSortFieldUtils.java @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.google.common.collect.ImmutableList; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.types.Types; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; + +import java.util.function.Consumer; + +import static com.facebook.presto.iceberg.SortFieldUtils.parseSortFields; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +public class TestSortFieldUtils +{ + @Test + public void testParse() + { + assertParse("order_key", sortOrder(builder -> builder.asc("order_key"))); + assertParse("order_key ASC", sortOrder(builder -> builder.asc("order_key"))); + assertParse("order_key ASC NULLS FIRST", sortOrder(builder -> builder.asc("order_key"))); + assertParse("order_key ASC NULLS FIRST", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_FIRST))); + assertParse("order_key ASC NULLS LAST", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST))); + assertParse("order_key DESC", sortOrder(builder -> builder.desc("order_key"))); + assertParse("order_key DESC NULLS FIRST", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST))); + assertParse("order_key DESC NULLS LAST", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_LAST))); + assertParse("order_key DESC NULLS LAST", sortOrder(builder -> builder.desc("order_key"))); + + // lowercase + assertParse("order_key asc nulls last", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST))); + assertParse("order_key desc nulls first", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST))); + assertParse("\"order_key\" asc nulls last", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST))); + assertParse("\"order_key\" desc nulls first", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST))); + + // uppercase + assertParse("ORDER_KEY ASC NULLS LAST", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST))); + assertParse("ORDER_KEY DESC NULLS FIRST", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST))); + assertDoesNotParse("\"ORDER_KEY\" ASC NULLS LAST", "Uppercase characters in identifier '\"ORDER_KEY\"' are not supported."); + assertDoesNotParse("\"ORDER_KEY\" DESC NULLS FIRST", "Uppercase characters in identifier '\"ORDER_KEY\"' are not supported."); + + // mixed case + assertParse("OrDER_keY Asc NullS LAst", sortOrder(builder -> builder.asc("order_key", NullOrder.NULLS_LAST))); + assertParse("OrDER_keY Desc NullS FIrsT", sortOrder(builder -> builder.desc("order_key", NullOrder.NULLS_FIRST))); + assertDoesNotParse("\"OrDER_keY\" Asc NullS LAst", "Uppercase characters in identifier '\"OrDER_keY\"' are not supported."); + assertDoesNotParse("\"OrDER_keY\" Desc NullS FIrsT", "Uppercase characters in identifier '\"OrDER_keY\"' are not supported."); + + assertParse("comment", sortOrder(builder -> builder.asc("comment"))); + assertParse("\"comment\"", sortOrder(builder -> builder.asc("comment"))); + assertParse("\"quoted field\"", sortOrder(builder -> builder.asc("quoted field"))); + assertParse("\"\"\"another\"\" \"\"quoted\"\" \"\"field\"\"\"", sortOrder(builder -> builder.asc("\"another\" \"quoted\" \"field\""))); + assertParse("\"\"\"another\"\" \"\"quoted\"\" \"\"field\"\"\" ASC NULLS FIRST ", sortOrder(builder -> builder.asc("\"another\" \"quoted\" \"field\""))); + assertParse("\"\"\"another\"\" \"\"quoted\"\" \"\"field\"\"\" ASC NULLS LAST ", sortOrder(builder -> builder.asc("\"another\" \"quoted\" \"field\"", NullOrder.NULLS_LAST))); + assertParse("\"\"\"another\"\" \"\"quoted\"\" \"\"field\"\"\" DESC NULLS FIRST", sortOrder(builder -> builder.desc("\"another\" \"quoted\" \"field\"", NullOrder.NULLS_FIRST))); + assertParse(" comment ", sortOrder(builder -> builder.asc("comment"))); + assertParse("comment ASC", sortOrder(builder -> builder.asc("comment"))); + assertParse(" comment ASC ", sortOrder(builder -> builder.asc("comment"))); + assertParse("comment ASC NULLS FIRST", sortOrder(builder -> builder.asc("comment"))); + assertParse(" comment ASC NULLS FIRST ", sortOrder(builder -> builder.asc("comment"))); + assertParse("comment ASC NULLS FIRST", sortOrder(builder -> builder.asc("comment", NullOrder.NULLS_FIRST))); + assertParse(" comment ASC NULLS FIRST ", sortOrder(builder -> builder.asc("comment", NullOrder.NULLS_FIRST))); + assertParse("comment ASC NULLS FIRST", sortOrder(builder -> builder.asc("comment", NullOrder.NULLS_FIRST))); + assertParse(" comment ASC NULLS FIRST ", sortOrder(builder -> builder.asc("comment", NullOrder.NULLS_FIRST))); + assertParse("comment ASC NULLS LAST", sortOrder(builder -> builder.asc("comment", NullOrder.NULLS_LAST))); + assertParse(" comment ASC NULLS LAST ", sortOrder(builder -> builder.asc("comment", NullOrder.NULLS_LAST))); + assertParse("comment DESC", sortOrder(builder -> builder.desc("comment"))); + assertParse(" comment DESC ", sortOrder(builder -> builder.desc("comment"))); + assertParse("comment DESC NULLS FIRST", sortOrder(builder -> builder.desc("comment", NullOrder.NULLS_FIRST))); + assertParse(" comment DESC NULLS FIRST ", sortOrder(builder -> builder.desc("comment", NullOrder.NULLS_FIRST))); + assertParse("comment DESC NULLS LAST", sortOrder(builder -> builder.desc("comment", NullOrder.NULLS_LAST))); + assertParse(" comment DESC NULLS LAST ", sortOrder(builder -> builder.desc("comment", NullOrder.NULLS_LAST))); + assertParse("comment DESC NULLS LAST", sortOrder(builder -> builder.desc("comment"))); + assertParse(" comment DESC NULLS LAST ", sortOrder(builder -> builder.desc("comment"))); + + assertDoesNotParse("bucket(comment, 3)"); + assertDoesNotParse("truncate(comment, 3)"); + assertDoesNotParse("year(comment)"); + assertDoesNotParse("month(comment)"); + assertDoesNotParse("day(comment)"); + assertDoesNotParse("hour(comment)"); + + assertDoesNotParse("bucket(comment, 3) ASC"); + assertDoesNotParse("bucket(comment, 3) ASC NULLS LAST"); + } + + private static void assertParse(@Language("SQL") String value, SortOrder expected) + { + assertEquals(expected.fields().size(), 1); + assertEquals(parseField(value), expected); + } + + private static void assertDoesNotParse(@Language("SQL") String value) + { + assertDoesNotParse(value, format("Unable to parse sort field: [%s]", value)); + } + + private static void assertDoesNotParse(@Language("SQL") String value, String expectedMessage) + { + assertThatThrownBy(() -> parseField(value)) + .hasMessage(expectedMessage); + } + + private static SortOrder parseField(String value) + { + return sortOrder(builder -> parseSortFields(builder, ImmutableList.of(value))); + } + + private static SortOrder sortOrder(Consumer consumer) + { + Schema schema = new Schema( + Types.NestedField.required(1, "order_key", Types.LongType.get()), + Types.NestedField.required(2, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.required(3, "price", Types.DoubleType.get()), + Types.NestedField.optional(4, "comment", Types.StringType.get()), + Types.NestedField.optional(5, "notes", Types.ListType.ofRequired(6, Types.StringType.get())), + Types.NestedField.optional(7, "quoted field", Types.StringType.get()), + Types.NestedField.optional(8, "quoted ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(9, "\"another\" \"quoted\" \"field\"", Types.StringType.get())); + + SortOrder.Builder builder = SortOrder.builderFor(schema); + consumer.accept(builder); + return builder.build(); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java index 900a72526e542..8b1192a313708 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java @@ -73,6 +73,7 @@ public enum StandardErrorCode INVALID_TYPE_DEFINITION(0x0000_002F, USER_ERROR), VIEW_NOT_FOUND(0x0000_0030, USER_ERROR), INVALID_LIMIT_CLAUSE(0x0000_0031, USER_ERROR), + COLUMN_NOT_FOUND(0x0000_0032, USER_ERROR), GENERIC_INTERNAL_ERROR(0x0001_0000, INTERNAL_ERROR), TOO_MANY_REQUESTS_FAILED(0x0001_0001, INTERNAL_ERROR, true), @@ -138,8 +139,6 @@ public enum StandardErrorCode MISSING_RESOURCE_GROUP_SELECTOR(0x0002_0010, INTERNAL_ERROR), EXCEEDED_HEAP_MEMORY_LIMIT(0x0002_0011, INSUFFICIENT_RESOURCES), EXCEEDED_WRITTEN_INTERMEDIATE_BYTES_LIMIT(0x0002_0012, INSUFFICIENT_RESOURCES), - TOO_MANY_SIDECARS(0x0002_0013, INTERNAL_ERROR), - NO_CPP_SIDECARS(0x0002_0014, INTERNAL_ERROR), /**/; // Error code range 0x0003 is reserved for Presto-on-Spark