From cabf1a9bf0ad2f48558fbb5af64f748397431a4b Mon Sep 17 00:00:00 2001 From: Reetika Agrawal Date: Thu, 12 Sep 2024 18:19:19 +0530 Subject: [PATCH] Set splitsize for hadoop InputFormat to Presto max_split_size --- .../presto/hive/StoragePartitionLoader.java | 4 ++ .../hive/TestBackgroundHiveSplitLoader.java | 56 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java index 676a68f2757e4..7b9f722e701f7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java @@ -64,6 +64,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; import static com.facebook.presto.hive.HiveMetadata.shouldCreateFilesForMissingBuckets; import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize; +import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize; import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable; import static com.facebook.presto.hive.HiveSessionProperties.isOrderBasedExecutionEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled; @@ -111,6 +112,7 @@ public class StoragePartitionLoader private final Deque> fileIterators; private final boolean schedulerUsesHostAddresses; private final boolean partialAggregationsPushedDown; + private static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; public StoragePartitionLoader( Table table, @@ -185,6 +187,7 @@ private ListenableFuture handleSymlinkTextInputFormat(ExtendedFileSystem fs, JobConf targetJob = toJobConf(targetFilesystem.getConf()); targetJob.setInputFormat(TextInputFormat.class); targetInputFormat.configure(targetJob); + targetJob.set(SPLIT_MINSIZE, Long.toString(getMaxSplitSize(session).toBytes())); FileInputFormat.setInputPaths(targetJob, targetPath); InputSplit[] targetSplits = targetInputFormat.getSplits(targetJob, 0); @@ -214,6 +217,7 @@ private ListenableFuture handleGetSplitsFromInputFormat(Configuration configu FileInputFormat.setInputPaths(jobConf, path); // SerDes parameters and Table parameters passing into input format fromProperties(schema).forEach(jobConf::set); + jobConf.set(SPLIT_MINSIZE, Long.toString(getMaxSplitSize(session).toBytes())); InputSplit[] splits = inputFormat.getSplits(jobConf, 0); return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java index 5633478e9b10b..f20a155c3480e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java @@ -18,6 +18,7 @@ import com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType; import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter; import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration; import com.facebook.presto.hive.filesystem.ExtendedFileSystem; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.Partition; @@ -33,6 +34,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.io.Resources; import io.airlift.units.DataSize; import io.airlift.units.Duration; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HadoopExtendedFileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -51,9 +55,11 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.Progressable; import org.testng.annotations.Test; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Iterator; @@ -82,6 +88,7 @@ import static com.facebook.presto.hive.StoragePartitionLoader.BucketSplitInfo.createBucketSplitInfo; import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE; import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat; +import static com.facebook.presto.hive.util.ConfigurationUtils.toJobConf; import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; @@ -91,6 +98,7 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.Executors.newCachedThreadPool; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.FileAssert.fail; @@ -125,6 +133,8 @@ public class TestBackgroundHiveSplitLoader private static final Table SIMPLE_TABLE = table(ImmutableList.of(), Optional.empty()); private static final Table PARTITIONED_TABLE = table(PARTITION_COLUMNS, BUCKET_PROPERTY); + private static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; + private static final String BLOCK_SIZE = "fs.local.block.size"; @Test public void testNoPathFilter() @@ -366,6 +376,52 @@ public void testSplittableNotCheckedOnSmallFiles() } } + private FileSystem getFileSystem(HdfsEnvironment environment, String user, Path path) + throws IOException + { + return environment.getFileSystem(user, path, new Configuration(false)); + } + + private Configuration getHadoopConfiguration() + { + Configuration hadoopConf = new Configuration(); + hadoopConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + return new HiveCachingHdfsConfiguration.CachingJobConf((factoryConfig, factoryUri) -> { + FileSystem localFileSystem = new LocalFileSystem(); + try { + localFileSystem.initialize(URI.create("file:///"), hadoopConf); + } + catch (IOException e) { + throw new RuntimeException(e); + } + return new HadoopExtendedFileSystem(localFileSystem); + }, hadoopConf); + } + + @Test + public void testCustomSplitSize() throws Exception + { + long splitSize = 1000; + long blockSize = 500; + String filePath = Resources.getResource("addressbook.parquet").getFile(); + + Path path = new Path(filePath); + + TextInputFormat targetInputFormat = new TextInputFormat(); + FileSystem fileSystem = path.getFileSystem(getHadoopConfiguration()); + JobConf targetJob = toJobConf(fileSystem.getConf()); + targetJob.setLong(BLOCK_SIZE, blockSize); + targetJob.set(SPLIT_MINSIZE, Long.toString(splitSize)); + targetInputFormat.configure(targetJob); + + FileInputFormat.setInputPaths(targetJob, path); + InputSplit[] targetSplits = targetInputFormat.getSplits(targetJob, 0); + + assertNotNull(targetSplits); + assertEquals(targetSplits.length, 4); + } + public static final class TestSplittableFailureInputFormat extends FileInputFormat {