From 6eb0bf023a25840999ebdb0d422e240afccdab5f Mon Sep 17 00:00:00 2001 From: Reetika Agrawal Date: Thu, 12 Sep 2024 18:19:19 +0530 Subject: [PATCH] Set splitsize for hadoop InputFormat to Presto max_split_size --- .../presto/hive/StoragePartitionLoader.java | 4 + .../presto/hive/TestCustomInputSplits.java | 80 +++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 presto-hive/src/test/java/com/facebook/presto/hive/TestCustomInputSplits.java diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java index 08ab8a7b5de72..6a4052656ff46 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java @@ -64,6 +64,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static com.facebook.presto.hive.HiveMetadata.shouldCreateFilesForMissingBuckets; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; +import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable; import static com.facebook.presto.hive.HiveSessionProperties.isOrderBasedExecutionEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled; @@ -111,6 +112,7 @@ public class StoragePartitionLoader private final Deque> fileIterators; private final boolean schedulerUsesHostAddresses; private final boolean partialAggregationsPushedDown; + private static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; public StoragePartitionLoader( Table table, @@ -185,6 +187,7 @@ private ListenableFuture handleSymlinkTextInputFormat(ExtendedFileSystem fs, JobConf targetJob = toJobConf(targetFilesystem.getConf()); targetJob.setInputFormat(TextInputFormat.class); targetInputFormat.configure(targetJob); + targetJob.set(SPLIT_MINSIZE, Long.toString(getMaxSplitSize(session).toBytes())); FileInputFormat.setInputPaths(targetJob, targetPath); InputSplit[] targetSplits = targetInputFormat.getSplits(targetJob, 0); @@ -214,6 +217,7 @@ private ListenableFuture handleGetSplitsFromInputFormat(Configuration configu FileInputFormat.setInputPaths(jobConf, path); // SerDes parameters and Table parameters passing into input format fromProperties(schema).forEach(jobConf::set); + jobConf.set(SPLIT_MINSIZE, Long.toString(getMaxSplitSize(session).toBytes())); InputSplit[] splits = inputFormat.getSplits(jobConf, 0); return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestCustomInputSplits.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestCustomInputSplits.java new file mode 100644 index 0000000000000..5f6c9061080ff --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestCustomInputSplits.java @@ -0,0 +1,80 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.google.common.io.Resources; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HadoopExtendedFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@Test(singleThreaded = true) +public class TestCustomInputSplits +{ + private static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; + private static final String LOCAL_BLOCK_SIZE = "fs.local.block.size"; + private static final String DFS_BLOCK_SIZE = "dfs.blocksize"; + + @Test + public void testCustomSplitSize() throws Exception + { + long splitSize = 1000; + long blockSize = 500; + String filePath = Resources.getResource("addressbook.parquet").getFile(); + Path path = new Path(filePath); + + TextInputFormat targetInputFormat = new DummyTextInputFormat(); + JobConf jobConf = new JobConf(); + FileInputFormat.setInputPaths(jobConf, path); + + jobConf.set("fs.defaultFS", "file:///"); + jobConf.setClass("fs.file.impl", TestingFileSystem.class, FileSystem.class); + jobConf.setClass("fs.hdfs.impl", TestingFileSystem.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.setLong(LOCAL_BLOCK_SIZE, blockSize); + jobConf.setLong(DFS_BLOCK_SIZE, blockSize); + jobConf.set(SPLIT_MINSIZE, Long.toString(splitSize)); + + InputSplit[] targetSplits = targetInputFormat.getSplits(jobConf, 0); + + assertNotNull(targetSplits); + assertEquals(targetSplits.length, 4); + } + + public class DummyTextInputFormat + extends TextInputFormat + { + protected boolean isSplitable(FileSystem fs, Path file) + { + return true; + } + } + + public static class TestingFileSystem + extends HadoopExtendedFileSystem + { + public TestingFileSystem() + { + super(new LocalFileSystem()); + } + } +}