Skip to content

Commit

Permalink
[fix](hive)fix hive catalog miss partition that have special characte…
Browse files Browse the repository at this point in the history
…rs. (apache#42906)

## Proposed changes
Previously, when processing partition values, Hive catalog parsed them
in the URL format. However, this is different from the encoding method
of Hive, which results in missing some partitions with special
characters when reading the partition table.
This PR is to fix this problem.
Ref:
common/src/java/org/apache/hadoop/hive/common/FileUtils.java:`makePartName(List<String>
partCols, List<String> vals,String defaultStr)`
  • Loading branch information
hubgeter authored Nov 1, 2024
1 parent 59fd2ea commit 03c625f
Show file tree
Hide file tree
Showing 7 changed files with 626 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
import com.google.common.collect.RangeMap;
import lombok.Data;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,11 +76,6 @@ public TablePartitionValues(List<String> partitionNames, List<List<String>> part
addPartitions(partitionNames, partitionValues, types);
}

public TablePartitionValues(List<String> partitionNames, List<Type> types) {
this();
addPartitions(partitionNames, types);
}

public void addPartitions(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) {
Preconditions.checkState(partitionNames.size() == partitionValues.size());
List<String> addPartitionNames = new ArrayList<>();
Expand All @@ -105,10 +96,6 @@ public void addPartitions(List<String> partitionNames, List<List<String>> partit
addPartitionItems(addPartitionNames, addPartitionItems, types);
}

public void addPartitions(List<String> partitionNames, List<Type> types) {
addPartitions(partitionNames,
partitionNames.stream().map(this::getHivePartitionValues).collect(Collectors.toList()), types);
}

private void addPartitionItems(List<String> partitionNames, List<PartitionItem> partitionItems, List<Type> types) {
Preconditions.checkState(partitionNames.size() == partitionItems.size());
Expand Down Expand Up @@ -196,23 +183,6 @@ private ListPartitionItem toListPartitionItem(List<String> partitionValues, List
}
}

private List<String> getHivePartitionValues(String partitionName) {
// Partition name will be in format: nation=cn/city=beijing
// parse it to get values "cn" and "beijing"
return Arrays.stream(partitionName.split("/")).map(part -> {
String[] kv = part.split("=");
Preconditions.checkState(kv.length == 2, partitionName);
String partitionValue;
try {
// hive partition value maybe contains special characters like '=' and '/'
partitionValue = URLDecoder.decode(kv[1], StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
// It should not be here
throw new RuntimeException(e);
}
return partitionValue;
}).collect(Collectors.toList());
}

@Data
public static class TablePartitionKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public Long getValue() {
}

private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
// partition name format: nation=cn/city=beijing
// partition name format: nation=cn/city=beijing,`listPartitionNames` returned string is the encoded string.
List<String> partitionNames = catalog.getClient().listPartitionNames(key.dbName, key.tblName);
if (LOG.isDebugEnabled()) {
LOG.debug("load #{} partitions for {} in catalog {}", partitionNames.size(), key, catalog.getName());
Expand Down Expand Up @@ -281,11 +281,10 @@ private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
public ListPartitionItem toListPartitionItem(String partitionName, List<Type> types) {
// Partition name will be in format: nation=cn/city=beijing
// parse it to get values "cn" and "beijing"
String[] parts = partitionName.split("/");
Preconditions.checkState(parts.length == types.size(), partitionName + " vs. " + types);
List<String> partitionValues = HiveUtil.toPartitionValues(partitionName);
Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types);
List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size());
for (String part : parts) {
String partitionValue = HiveUtil.getHivePartitionValue(part);
for (String partitionValue : partitionValues) {
values.add(new PartitionValue(partitionValue, HIVE_DEFAULT_PARTITION.equals(partitionValue)));
}
try {
Expand Down Expand Up @@ -325,9 +324,9 @@ private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends
StringBuilder sb = new StringBuilder();
Preconditions.checkState(key.getValues().size() == partitionColumns.size());
for (int i = 0; i < partitionColumns.size(); i++) {
sb.append(partitionColumns.get(i).getName());
// Partition name and value may contain special character, like / and so on. Need to encode.
sb.append(FileUtils.escapePathName(partitionColumns.get(i).getName()));
sb.append("=");
// Partition value may contain special character, like / and so on. Need to encode.
sb.append(FileUtils.escapePathName(key.getValues().get(i)));
sb.append("/");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -123,16 +120,22 @@ public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inp
return HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat);
}

public static String getHivePartitionValue(String part) {
String[] kv = part.split("=");
Preconditions.checkState(kv.length == 2, String.format("Malformed partition name %s", part));
try {
// hive partition value maybe contains special characters like '=' and '/'
return URLDecoder.decode(kv[1], StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
// It should not be here
throw new RuntimeException(e);
// "c1=a/c2=b/c3=c" ---> List(["c1","a"], ["c2","b"], ["c3","c"])
// Similar to the `toPartitionValues` method, except that it adds the partition column name.
public static List<String[]> toPartitionColNameAndValues(String partitionName) {

String[] parts = partitionName.split("/");
List<String[]> result = new ArrayList<>(parts.length);
for (String part : parts) {
String[] kv = part.split("=");
Preconditions.checkState(kv.length == 2, String.format("Malformed partition name %s", part));

result.add(new String[] {
FileUtils.unescapePathName(kv[0]),
FileUtils.unescapePathName(kv[1])
});
}
return result;
}

// "c1=a/c2=b/c3=c" ---> List("a","b","c")
Expand All @@ -151,6 +154,8 @@ public static List<String> toPartitionValues(String partitionName) {
if (start > partitionName.length()) {
break;
}
//Ref: common/src/java/org/apache/hadoop/hive/common/FileUtils.java
//makePartName(List<String> partCols, List<String> vals,String defaultStr)
resultBuilder.add(FileUtils.unescapePathName(partitionName.substring(start, end)));
start = end + 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,11 @@ private void handleShowHMSTablePartitions(ShowPartitionsStmt showStmt) throws An
Map<String, Expr> filterMap = showStmt.getFilterMap();
List<OrderByPair> orderByPairs = showStmt.getOrderByPairs();

// catalog.getClient().listPartitionNames() returned string is the encoded string.
// example: insert into tmp partition(pt="1=3/3") values( xxx );
// show partitions from tmp: pt=1%3D3%2F3
// Need to consider whether to call `HiveUtil.toPartitionColNameAndValues` method

if (limit != null && limit.hasLimit() && limit.getOffset() == 0
&& (orderByPairs == null || !orderByPairs.get(0).isDesc())) {
// hmsClient returns unordered partition list, hence if offset > 0 cannot pass limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ private void getPartitionColumnStats() {
for (String names : partitionNames) {
// names is like "date=20230101" for one level partition
// and like "date=20230101/hour=12" for two level partition
String[] parts = names.split("/");
for (String part : parts) {
if (part.startsWith(col.getName())) {
String value = HiveUtil.getHivePartitionValue(part);
List<String[]> parts = HiveUtil.toPartitionColNameAndValues(names);
for (String[] part : parts) {
String colName = part[0];
String value = part[1];
if (colName != null && colName.equals(col.getName())) {
// HIVE_DEFAULT_PARTITION hive partition value when the partition name is not specified.
if (value == null || value.isEmpty() || value.equals(HiveMetaStoreCache.HIVE_DEFAULT_PARTITION)) {
numNulls += 1;
Expand Down
Loading

0 comments on commit 03c625f

Please sign in to comment.