From e12db45aa1eba8e69a8968c60e117ecec4862f1d Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 12 Dec 2024 11:12:43 +0800 Subject: [PATCH] [GLUTEN-8043] Use spark.shuffle.spill.diskWriteBufferSize in sort-based shuffle (#8203) --- .../spark/shuffle/ColumnarShuffleWriter.scala | 6 +++--- cpp/core/jni/JniWrapper.cc | 4 ++-- cpp/core/shuffle/Options.h | 4 ++-- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 10 +++++----- .../utils/tests/VeloxShuffleWriterTestBase.h | 2 +- .../CelebornColumnarShuffleWriter.scala | 2 +- .../spark/shuffle/GlutenShuffleUtils.scala | 19 +++++++++---------- .../VeloxUniffleColumnarShuffleWriter.java | 2 +- .../org/apache/gluten/GlutenConfig.scala | 1 - 9 files changed, 24 insertions(+), 26 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index e9f821512e16..bb84e3066b5c 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -92,8 +92,8 @@ class ColumnarShuffleWriter[K, V]( private val compressionLevel = GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, compressionCodecBackend) - private val compressionBufferSize = - GlutenShuffleUtils.getCompressionBufferSize(conf, compressionCodec) + private val sortEvictBufferSize = + GlutenShuffleUtils.getSortEvictBufferSize(conf, compressionCodec) private val bufferCompressThreshold = GlutenConfig.getConf.columnarShuffleCompressionThreshold @@ -147,7 +147,7 @@ class ColumnarShuffleWriter[K, V]( compressionCodec, compressionCodecBackend, compressionLevel, - compressionBufferSize, + sortEvictBufferSize, bufferCompressThreshold, GlutenConfig.getConf.columnarShuffleCompressionMode, conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt, diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index f75b16b46126..61cedf85763e 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -832,7 +832,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jstring codecJstr, jstring codecBackendJstr, jint compressionLevel, - jint compressionBufferSize, + jint sortEvictBufferSize, jint compressionThreshold, jstring compressionModeJstr, jint sortBufferInitialSize, @@ -864,7 +864,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe .startPartitionId = startPartitionId, .shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)), .sortBufferInitialSize = sortBufferInitialSize, - .compressionBufferSize = compressionBufferSize, + .sortEvictBufferSize = sortEvictBufferSize, .useRadixSort = static_cast(useRadixSort)}; // Build PartitionWriterOptions. diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 3a1efdc2ae90..5e98f8291283 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -30,7 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20; static constexpr int64_t kDefaultPushMemoryThreshold = 4096; static constexpr int32_t kDefaultNumSubDirs = 64; static constexpr int32_t kDefaultCompressionThreshold = 100; -static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024; +static constexpr int32_t kDefaultSortEvictBufferSize = 32 * 1024; static const std::string kDefaultCompressionTypeStr = "lz4"; static constexpr int32_t kDefaultBufferAlignment = 64; static constexpr double kDefaultBufferReallocThreshold = 0.25; @@ -66,7 +66,7 @@ struct ShuffleWriterOptions { // Sort shuffle writer. int32_t sortBufferInitialSize = kDefaultSortBufferSize; - int32_t compressionBufferSize = kDefaultCompressionBufferSize; + int32_t sortEvictBufferSize = kDefaultSortEvictBufferSize; bool useRadixSort = kDefaultUseRadixSort; }; diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 793130a58adc..7959adc43a13 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -108,10 +108,10 @@ arrow::Status VeloxSortShuffleWriter::init() { // In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor // memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten. ARROW_ASSIGN_OR_RAISE( - sortedBuffer_, arrow::AllocateBuffer(options_.compressionBufferSize, arrow::default_memory_pool())); + sortedBuffer_, arrow::AllocateBuffer(options_.sortEvictBufferSize, arrow::default_memory_pool())); rawBuffer_ = sortedBuffer_->mutable_data(); auto compressedBufferLength = partitionWriter_->getCompressedBufferLength( - {std::make_shared(rawBuffer_, options_.compressionBufferSize)}); + {std::make_shared(rawBuffer_, options_.sortEvictBufferSize)}); if (compressedBufferLength.has_value()) { ARROW_ASSIGN_OR_RAISE( compressionBuffer_, arrow::AllocateBuffer(*compressedBufferLength, arrow::default_memory_pool())); @@ -295,20 +295,20 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_ auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]); addr = pageAddresses_[pageIndex.first] + pageIndex.second; size = *(RowSizeType*)addr; - if (offset + size > options_.compressionBufferSize && offset > 0) { + if (offset + size > options_.sortEvictBufferSize && offset > 0) { sortTime.stop(); RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset)); sortTime.start(); begin = index; offset = 0; } - if (size > options_.compressionBufferSize) { + if (size > options_.sortEvictBufferSize) { // Split large rows. sortTime.stop(); RowSizeType bytes = 0; auto* buffer = reinterpret_cast(addr); while (bytes < size) { - auto rawLength = std::min((uint32_t)options_.compressionBufferSize, size - bytes); + auto rawLength = std::min((uint32_t)options_.sortEvictBufferSize, size - bytes); // Use numRows = 0 to represent a part of row. RETURN_NOT_OK(evictPartition0(partitionId, 0, buffer + bytes, rawLength)); bytes += rawLength; diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index d2995e251c68..9331a7207858 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -261,7 +261,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam @@ -92,7 +91,7 @@ object GlutenShuffleUtils { } } - def getCompressionBufferSize(conf: SparkConf, codec: String): Int = { + def getSortEvictBufferSize(conf: SparkConf, codec: String): Int = { def checkAndGetBufferSize(entry: ConfigEntry[Long]): Int = { val bufferSize = conf.get(entry).toInt if (bufferSize < 4) { @@ -105,7 +104,7 @@ object GlutenShuffleUtils { } else if ("zstd" == codec) { checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE) } else { - GlutenConfig.GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE + checkAndGetBufferSize(SHUFFLE_DISK_WRITE_BUFFER_SIZE) } } diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index d32da8d93b31..7cbf452f76f6 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -129,7 +129,7 @@ public VeloxUniffleColumnarShuffleWriter( compressionCodec, GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null)); compressionBufferSize = - GlutenShuffleUtils.getCompressionBufferSize(sparkConf, compressionCodec); + GlutenShuffleUtils.getSortEvictBufferSize(sparkConf, compressionCodec); } } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 98a9e82b2c09..c2cfff1b7182 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -619,7 +619,6 @@ object GlutenConfig { // Shuffle Writer buffer size. val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize" val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD = "spark.gluten.sql.columnar.shuffle.merge.threshold" - val GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE = 32 * 1024 // Shuffle reader buffer size. val GLUTEN_SHUFFLE_READER_BUFFER_SIZE = "spark.gluten.sql.columnar.shuffle.readerBufferSize"