-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for Iceberg table sort orders #21977
base: master
Are you sure you want to change the base?
Conversation
|
Codenotify: Notifying subscribers in CODENOTIFY files for diff eae4696...028f524. No notifications. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the documentation! I had a few suggestions to help readability, and one question about the formatting of one of the two code examples.
Also please sign the Presto CLA by selecting the "Please click here to be authorized" link in the earlier comment.
d7a8899
to
b65eae5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes! This is better. I found a nit near the beginning, and I ask if you would recheck the formatting of the first code block example.
8c6b521
to
57e5e2a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull of updated branch, new local build of docs. Thanks for the changes!
57e5e2a
to
07ed82a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take a first look, some problems for discussion.
CREATE TABLE emp.employees.employee ( | ||
emp_id BIGINT, | ||
emp_name VARCHAR, | ||
join_date DATE, | ||
country VARCHAR) | ||
WITH ( | ||
sorted_by = ARRAY['join_date'] | ||
) | ||
|
||
Sorting can be combined with partitioning on the same column. For example:: | ||
|
||
CREATE TABLE emp.employees.employee ( | ||
emp_id BIGINT, | ||
emp_name VARCHAR, | ||
join_date DATE, | ||
country VARCHAR) | ||
WITH ( | ||
partitioning = ARRAY['month(join_date)'], | ||
sorted_by = ARRAY['join_date'] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add description about the message like DESC NULLS FIRST
that may appear after join_date
?
icebergTable.sortOrder().fields().stream() | ||
.map(SortField::fromIceberg) | ||
.collect(toImmutableList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems haven't support IcebergNativeMetadata
. Maybe try use transaction.replaceSortOrder()
to support sort_by in IcebergNativeMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will analyse and work on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
@Test | ||
public void testSortByAllTypes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move the test cases to IcebergDistributedTestBase
after support IcebergNativeMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will work on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the test cases to IcebergDistributedTestBase
assertUpdate("INSERT INTO " + tableName + " VALUES " + values + ", " + highValues + ", " + lowValues, 3); | ||
dropTable(getSession(), tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add more test cases to illustrate the function of sort_by
, for example the data in written files are indeed sorted. What I feel from the current test cases is that we are mainly doing the syntax support testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will add test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd Added test cases.
07ed82a
to
30ac341
Compare
Hi @evanvdia, just wanted to confirm that are you still working on this? |
Hi @hantangwangd, yes. working on this. |
7b3bd0f
to
3b6903b
Compare
@evanvdia is this ready for review? There is a merge conflict. |
3b6903b
to
ef6df97
Compare
@tdcmeehan Merge conflicts are resolved. But iceberg unit test cases are failing after rebasing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, please give us a notice when this PR is ready for review, thanks.
continue; | ||
} | ||
|
||
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position); | ||
WriteContext writer = createWriter(partitionData); | ||
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position); | |
Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), transformedPage, position); |
After check the code, I think the problem is introduced by this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hantangwangd . that was the issue. done the changes. Will let you know once PR is ready for review.
ef6df97
to
32840e1
Compare
32840e1
to
5e60f75
Compare
5e60f75
to
7dc619f
Compare
presto-hive/src/main/java/com/facebook/presto/hive/SortingFileWriterConfig.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java
Show resolved
Hide resolved
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSink.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestSortFieldUtils.java
Show resolved
Hide resolved
@evanvdia are you working on this? |
7dc619f
to
6b7e988
Compare
6b7e988
to
34c3810
Compare
34c3810
to
763dc56
Compare
@evanvdia I see you have pushed a change. Please, when it's ready for review, ping the reviewers when the comments have been addressed. |
763dc56
to
be6f4c7
Compare
be6f4c7
to
4f3e8e0
Compare
@evanvdia can you investigate the unit test failures? There are quite a few in the Iceberg module likely caused by these new changes:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to reach out if you need any assistance to support sort orders on IcebergNativeMetadata
.
transaction.replaceSortOrder().apply().fields().stream().map(SortField::fromIceberg) | ||
.collect(toImmutableList())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't open a transaction action (here is ReplaceSortOrder
) without committing it, this will prevent any subsequent actions in the same transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd Added support to sort orders on IcebergNativeMetadata. and moved the test cases to IcebergDistributedTestBase. But some of the test cases are failing.
4f3e8e0
to
5af2017
Compare
@ZacBlanco This unit test cases #21977 (comment) failures are fixed. |
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder(); | ||
List<SortField> sortFields = replaceSortOrder.apply().fields().stream().map(SortField::fromIceberg) | ||
.collect(toImmutableList()); | ||
try { | ||
replaceSortOrder.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder(); | |
List<SortField> sortFields = replaceSortOrder.apply().fields().stream().map(SortField::fromIceberg) | |
.collect(toImmutableList()); | |
try { | |
replaceSortOrder.commit(); | |
SortOrder sortOrder = parseSortFields(schema, getSortOrder(tableMetadata.getProperties())); | |
ReplaceSortOrder replaceSortOrder = transaction.replaceSortOrder(); | |
List<org.apache.iceberg.SortField> icebergSortFields = sortOrder.fields(); | |
List<SortField> sortFields = icebergSortFields.stream().map(SortField::fromIceberg) | |
.collect(toImmutableList()); | |
try { | |
for (org.apache.iceberg.SortField sortField : icebergSortFields) { | |
if (sortField.direction() == SortDirection.ASC) { | |
replaceSortOrder.asc(schema.findColumnName(sortField.sourceId()), sortField.nullOrder()); | |
} | |
else { | |
replaceSortOrder.desc(schema.findColumnName(sortField.sourceId()), sortField.nullOrder()); | |
} | |
} |
I think a feasible way could go like this, although not carefully check whether it's the best. The core point is, we should get sortOrder
from tableMetadata
and then set it into the create table transaction through replaceSortOrder
. You can have a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hantangwangd Done the changes.
286cc5e
to
9c96c62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to do a better job testing and documenting the limitations (e.g. ORC only for sorting from what I can tell?).
Also, what happens if you use parquet and try to create and insert to a table table with a sort order? What about if sorted writing is disabled, but the table specifies a sort order. Is an error thrown? We should have a test case which clearly shows the expected behavior.
The Iceberg connector supports the creation of sorted tables. | ||
Data in the Iceberg table is sorted as each file is written. | ||
|
||
Sorted Iceberg tables can provide a huge increase in performance in query times. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorted Iceberg tables can provide a huge increase in performance in query times. | |
Sorted Iceberg tables can decrease query times in many cases; however, it will depend on the query shape and cluster configuration. |
Let's not try to make bold claims. I would recommend leaving out the adjective "huge" and try to more objectively describe the feature. Especially when making performance claims, there are many factors which can increase (or decrease) performance. Users will get upset if sorted tables don't always improve performance :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco It also supports sorting data in Parquet files.
Data sorted in PARQUET file format
Data sorted in ORC file format
If we set sorted_writing_enabled
as false and table specifies a sort_by property, it will not throw any error but data in the files are unsorted.
Added test cases for scenarios where sorted writing is disabled, but the table specifies a sort order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we set sorted_writing_enabled as false and table specifies a sort_by property, it will not throw any error but data in the files are unsorted.
Is there a reason that this should even be a configuration property and a table property? I don't think it really makes sense to have both. In the case where the configuration is disabled but it is set on the table property, we will potentially get incorrect results for later queries who expect the data in each file to be sorted.
Sorting is particularly beneficial when the sorted columns have a | ||
high cardinality and are used as a filter for selective reads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that this sentence very clearly states the cases where sorting is beneficial 👍
} | ||
|
||
@Config("max-open-sort-files") | ||
@ConfigDescription("Maximum number of writer temporary files to read in one pass") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand the description. Can you help clarify this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "Maximum number of writer temporary files to read in one pass" refers to a configuration in Iceberg's sorted_by table property implementation that controls how temporary files created by writers during a table's data writing process are handled when merging or compacting data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed this was basically copied from the previous file it was in, though the config name is changed slightly. I think that we should probably keep the same configuration name, or deprecate the old one, otherwise users might be surprised at the change.
Additionally for the description, I think it can be clarified a little bit. How about:
When writing, the maximum number of temporary files opened at one time to write sorted data.
return writerSortBufferSize; | ||
} | ||
|
||
@Config("writer-sort-buffer-size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a description here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure @ZacBlanco.
@@ -612,6 +641,12 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab | |||
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable)); | |||
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable)); | |||
|
|||
SortOrder sortOrder = icebergTable.sortOrder(); | |||
// TODO: Support sort column transforms (https://github.com/trinodb/trino/issues/15088) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add this caveat to the documentation as well as file our own issue in the presto issue tracker and link that instead of Trino's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created an issue in presto issue tracker.
#24250
@@ -168,6 +172,7 @@ public IcebergHiveMetadata( | |||
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache); | |||
this.metastore = requireNonNull(metastore, "metastore is null"); | |||
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); | |||
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this added? We already have this field in the superclass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco, super doesn't expose any getter , so i kept it here. Do you want to expose getter in base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it need a getter though? The field in the superclass is protected so it can be used in subclasses
int maxOpenWriters, | ||
List<SortField> sortOrder, | ||
DataSize sortingFileWriterBufferSize, | ||
int sortingFileWriterMaxOpenFiles, | ||
TypeManager typeManager, | ||
PageSorter pageSorter, | ||
OrcFileWriterFactory orcFileWriterFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are quite a few fields related to sorting. I'm wondering if it might make sense to put them in their own class together.
Also, digging deeper, I realized that this seems to only be supported on ORC files due to the sorting file writer API. Have we considered enhancing this to support parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZacBlanco It also supports sorting data in Parquet files.
Do you really want me to do that class changes in this PR or can i open separate git issue to track this changes?
// We only support lowercase quoted identifiers for now. | ||
// See https://github.com/trinodb/trino/issues/12226#issuecomment-1128839259 | ||
// TODO: Enhance quoted identifiers support in Iceberg partitioning to support mixed case identifiers | ||
// See https://github.com/trinodb/trino/issues/12668 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help more clearly describe the limitation of this comment? Does this mean that identifiers in the CREATE TABLE specifying the sort order have to be quoted and lowercase?
We should clearly document this limitation to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @ZacBlanco, It only supports lowercase quoted identifiers for now; otherwise, it throws an error, as shown in the following screenshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should document this limitation and provide a test that asserts the failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the handling logic of the sort order column consistent with that of the table column? For example, when we create a table like this:
create table test("ID" int);
We will succeed with the column name lowercased to id
. Is there a reason that the sort by column could not be handled like this?
9c96c62
to
1eb811d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, the change overall looks good to me. Some little problems and nits.
@@ -76,7 +77,7 @@ public class HiveClientConfig | |||
private int splitLoaderConcurrency = 4; | |||
private DataSize maxInitialSplitSize; | |||
private int domainCompactionThreshold = 100; | |||
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE); | |||
private NodeSelectionStrategy nodeSelectionStrategy = NO_PREFERENCE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this field here? Seems it's not used anywhere.
if (sortOrder.isSorted() && sortOrder.fields().stream().allMatch(sortField -> sortField.transform().isIdentity())) { | ||
List<String> sortColumnNames = toSortFields(sortOrder); | ||
properties.put(SORTED_BY_PROPERTY, sortColumnNames); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it appropriate to throw an exception directly if there exists non-identity sorting columns?
I see in other places, such as getSupportedSortFields(...)
, we will filter out the sorting column with identity transform. Considering the tables created in an external engine and registered into presto, it seems more reasonable. Should we unify their behavior? Any misunderstanding please let me know.
continue; | ||
} | ||
|
||
sortFields.add(SortField.fromIceberg(sortField)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use static import
writers.set(writerIndex, writer); | ||
} | ||
catch (IOException e) { | ||
throw new PrestoException(HIVE_WRITER_OPEN_ERROR, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it appropriate to throw HIVE_WRITER_OPEN_ERROR
? Maybe something like SORT_WRITER_OPEN_ERROR
?
public static List<SortField> getSortFields(Table table) | ||
{ | ||
try { | ||
return table.sortOrder().fields().stream() | ||
.map(SortField::fromIceberg) | ||
.collect(toImmutableList()); | ||
} | ||
catch (Exception e) { | ||
log.warn(String.format("Unable to fetch sort fields for table %s: %s", table.name(), e.getMessage())); | ||
return ImmutableList.of(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it appropriate to directly ignore the sort fields' transform? As mentioned above, the iceberg table may be an external created table and registered into presto.
// We only support lowercase quoted identifiers for now. | ||
// See https://github.com/trinodb/trino/issues/12226#issuecomment-1128839259 | ||
// TODO: Enhance quoted identifiers support in Iceberg partitioning to support mixed case identifiers | ||
// See https://github.com/trinodb/trino/issues/12668 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the handling logic of the sort order column consistent with that of the table column? For example, when we create a table like this:
create table test("ID" int);
We will succeed with the column name lowercased to id
. Is there a reason that the sort by column could not be handled like this?
List<org.apache.iceberg.SortField> icebergSortFields = sortOrder.fields(); | ||
List<SortField> sortFields = icebergSortFields.stream().map(SortField::fromIceberg).collect(toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After recheck the code, I think it's more suitable to use the following statement here to obtain the sort fields:
List<SortField> sortFields = getSupportedSortFields(icebergTable.schema(), sortOrder);
What's your opinion? Any misunderstanding please tell me.
1eb811d
to
f6a663b
Compare
f6a663b
to
028f524
Compare
Description
Add Support for Iceberg connector to create sorted files. The sort order can be configured with the
sorted_by
table property. When creating the table, can specify an array of one or more columns to use for sorting.Cherry-pick of trinodb/trino#14891
Motivation and Context
Issue : #21978
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.