Skip to content

Commit

Permalink
[GLUTEN-8043] Use spark.shuffle.spill.diskWriteBufferSize in sort-ba…
Browse files Browse the repository at this point in the history
…sed shuffle (#8203)
  • Loading branch information
marin-ma authored Dec 12, 2024
1 parent da37af1 commit e12db45
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class ColumnarShuffleWriter[K, V](
private val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, compressionCodecBackend)

private val compressionBufferSize =
GlutenShuffleUtils.getCompressionBufferSize(conf, compressionCodec)
private val sortEvictBufferSize =
GlutenShuffleUtils.getSortEvictBufferSize(conf, compressionCodec)

private val bufferCompressThreshold =
GlutenConfig.getConf.columnarShuffleCompressionThreshold
Expand Down Expand Up @@ -147,7 +147,7 @@ class ColumnarShuffleWriter[K, V](
compressionCodec,
compressionCodecBackend,
compressionLevel,
compressionBufferSize,
sortEvictBufferSize,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jstring codecJstr,
jstring codecBackendJstr,
jint compressionLevel,
jint compressionBufferSize,
jint sortEvictBufferSize,
jint compressionThreshold,
jstring compressionModeJstr,
jint sortBufferInitialSize,
Expand Down Expand Up @@ -864,7 +864,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.startPartitionId = startPartitionId,
.shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)),
.sortBufferInitialSize = sortBufferInitialSize,
.compressionBufferSize = compressionBufferSize,
.sortEvictBufferSize = sortEvictBufferSize,
.useRadixSort = static_cast<bool>(useRadixSort)};

// Build PartitionWriterOptions.
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
static constexpr int32_t kDefaultSortEvictBufferSize = 32 * 1024;
static const std::string kDefaultCompressionTypeStr = "lz4";
static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
Expand Down Expand Up @@ -66,7 +66,7 @@ struct ShuffleWriterOptions {

// Sort shuffle writer.
int32_t sortBufferInitialSize = kDefaultSortBufferSize;
int32_t compressionBufferSize = kDefaultCompressionBufferSize;
int32_t sortEvictBufferSize = kDefaultSortEvictBufferSize;
bool useRadixSort = kDefaultUseRadixSort;
};

Expand Down
10 changes: 5 additions & 5 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ arrow::Status VeloxSortShuffleWriter::init() {
// In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor
// memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten.
ARROW_ASSIGN_OR_RAISE(
sortedBuffer_, arrow::AllocateBuffer(options_.compressionBufferSize, arrow::default_memory_pool()));
sortedBuffer_, arrow::AllocateBuffer(options_.sortEvictBufferSize, arrow::default_memory_pool()));
rawBuffer_ = sortedBuffer_->mutable_data();
auto compressedBufferLength = partitionWriter_->getCompressedBufferLength(
{std::make_shared<arrow::Buffer>(rawBuffer_, options_.compressionBufferSize)});
{std::make_shared<arrow::Buffer>(rawBuffer_, options_.sortEvictBufferSize)});
if (compressedBufferLength.has_value()) {
ARROW_ASSIGN_OR_RAISE(
compressionBuffer_, arrow::AllocateBuffer(*compressedBufferLength, arrow::default_memory_pool()));
Expand Down Expand Up @@ -295,20 +295,20 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
addr = pageAddresses_[pageIndex.first] + pageIndex.second;
size = *(RowSizeType*)addr;
if (offset + size > options_.compressionBufferSize && offset > 0) {
if (offset + size > options_.sortEvictBufferSize && offset > 0) {
sortTime.stop();
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset));
sortTime.start();
begin = index;
offset = 0;
}
if (size > options_.compressionBufferSize) {
if (size > options_.sortEvictBufferSize) {
// Split large rows.
sortTime.stop();
RowSizeType bytes = 0;
auto* buffer = reinterpret_cast<uint8_t*>(addr);
while (bytes < size) {
auto rawLength = std::min<RowSizeType>((uint32_t)options_.compressionBufferSize, size - bytes);
auto rawLength = std::min<RowSizeType>((uint32_t)options_.sortEvictBufferSize, size - bytes);
// Use numRows = 0 to represent a part of row.
RETURN_NOT_OK(evictPartition0(partitionId, 0, buffer + bytes, rawLength));
bytes += rawLength;
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams

ShuffleTestParams params = GetParam();
shuffleWriterOptions_.useRadixSort = params.useRadixSort;
shuffleWriterOptions_.compressionBufferSize = params.compressionBufferSize;
shuffleWriterOptions_.sortEvictBufferSize = params.compressionBufferSize;
partitionWriterOptions_.compressionType = params.compressionType;
switch (partitionWriterOptions_.compressionType) {
case arrow::Compression::UNCOMPRESSED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ abstract class CelebornColumnarShuffleWriter[K, V](
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

protected val compressionBufferSize: Int =
GlutenShuffleUtils.getCompressionBufferSize(conf, customizedCompressionCodec)
GlutenShuffleUtils.getSortEvictBufferSize(conf, customizedCompressionCodec)

protected val bufferCompressThreshold: Int =
GlutenConfig.getConf.columnarShuffleCompressionThreshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ object GlutenShuffleUtils {
}
}

def checkCodecValues(codecConf: String, codec: String, validValues: Set[String]): Unit = {
if (!validValues.contains(codec)) {
throw new IllegalArgumentException(
s"The value of $codecConf should be one of " +
s"${validValues.mkString(", ")}, but was $codec")
}
}

def getCompressionCodec(conf: SparkConf): String = {
def checkCodecValues(codecConf: String, codec: String, validValues: Set[String]): Unit = {
if (!validValues.contains(codec)) {
throw new IllegalArgumentException(
s"The value of $codecConf should be one of " +
s"${validValues.mkString(", ")}, but was $codec")
}
}
val glutenConfig = GlutenConfig.getConf
glutenConfig.columnarShuffleCodec match {
case Some(codec) =>
Expand Down Expand Up @@ -92,7 +91,7 @@ object GlutenShuffleUtils {
}
}

def getCompressionBufferSize(conf: SparkConf, codec: String): Int = {
def getSortEvictBufferSize(conf: SparkConf, codec: String): Int = {
def checkAndGetBufferSize(entry: ConfigEntry[Long]): Int = {
val bufferSize = conf.get(entry).toInt
if (bufferSize < 4) {
Expand All @@ -105,7 +104,7 @@ object GlutenShuffleUtils {
} else if ("zstd" == codec) {
checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE)
} else {
GlutenConfig.GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE
checkAndGetBufferSize(SHUFFLE_DISK_WRITE_BUFFER_SIZE)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public VeloxUniffleColumnarShuffleWriter(
compressionCodec,
GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null));
compressionBufferSize =
GlutenShuffleUtils.getCompressionBufferSize(sparkConf, compressionCodec);
GlutenShuffleUtils.getSortEvictBufferSize(sparkConf, compressionCodec);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,6 @@ object GlutenConfig {
// Shuffle Writer buffer size.
val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize"
val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD = "spark.gluten.sql.columnar.shuffle.merge.threshold"
val GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE = 32 * 1024

// Shuffle reader buffer size.
val GLUTEN_SHUFFLE_READER_BUFFER_SIZE = "spark.gluten.sql.columnar.shuffle.readerBufferSize"
Expand Down

0 comments on commit e12db45

Please sign in to comment.