Skip to content

Commit

Permalink
respect config kSpillReadBufferSize and add spill compression codec
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 13, 2024
1 parent dd72810 commit 62490ac
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 15 deletions.
1 change: 1 addition & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const std::string kUGITokens = "spark.gluten.ugi.tokens";

const std::string kShuffleCompressionCodec = "spark.gluten.sql.columnar.shuffle.codec";
const std::string kShuffleCompressionCodecBackend = "spark.gluten.sql.columnar.shuffle.codecBackend";
const std::string kShuffleSpillDiskWriteBufferSize = "spark.shuffle.spill.diskWriteBufferSize";
const std::string kQatBackendName = "qat";
const std::string kIaaBackendName = "iaa";

Expand Down
12 changes: 9 additions & 3 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,17 +516,23 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kMaxSpillBytes] =
std::to_string(veloxCfg_->get<uint64_t>(kMaxSpillBytes, 107374182400LL));
configs[velox::core::QueryConfig::kSpillWriteBufferSize] =
std::to_string(veloxCfg_->get<uint64_t>(kSpillWriteBufferSize, 4L * 1024 * 1024));
std::to_string(veloxCfg_->get<uint64_t>(kShuffleSpillDiskWriteBufferSize, 1L * 1024 * 1024));
configs[velox::core::QueryConfig::kSpillReadBufferSize] =
std::to_string(veloxCfg_->get<int32_t>(kSpillReadBufferSize, 1L * 1024 * 1024));
configs[velox::core::QueryConfig::kSpillStartPartitionBit] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillStartPartitionBit, 29));
configs[velox::core::QueryConfig::kSpillNumPartitionBits] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillPartitionBits, 3));
configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillableReservationGrowthPct, 25));
configs[velox::core::QueryConfig::kSpillCompressionKind] =
veloxCfg_->get<std::string>(kSpillCompressionKind, "lz4");
configs[velox::core::QueryConfig::kSpillPrefixSortEnabled] =
veloxCfg_->get<std::string>(kSpillPrefixSortEnabled, "false");
if (veloxCfg_->get<bool>(kSparkShuffleSpillCompress, true)) {
configs[velox::core::QueryConfig::kSpillCompressionKind] =
veloxCfg_->get<std::string>(kSpillCompressionKind, veloxCfg_->get<std::string>(kCompressionKind, "lz4"));
} else {
configs[velox::core::QueryConfig::kSpillCompressionKind] = "none";
}
configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] =
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterExpectedNumItems, 1000000));
configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] =
Expand Down
8 changes: 6 additions & 2 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ const std::string kSpillStartPartitionBit = "spark.gluten.sql.columnar.backend.v
const std::string kSpillPartitionBits = "spark.gluten.sql.columnar.backend.velox.spillPartitionBits";
const std::string kMaxSpillRunRows = "spark.gluten.sql.columnar.backend.velox.MaxSpillRunRows";
const std::string kMaxSpillBytes = "spark.gluten.sql.columnar.backend.velox.MaxSpillBytes";
const std::string kSpillWriteBufferSize = "spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize";
const std::string kSpillReadBufferSize = "spark.unsafe.sorter.spill.reader.buffer.size";
const uint64_t kMaxSpillFileSizeDefault = 1L * 1024 * 1024 * 1024;

const std::string kSpillableReservationGrowthPct =
"spark.gluten.sql.columnar.backend.velox.spillableReservationGrowthPct";
const std::string kSpillCompressionKind = "spark.io.compression.codec";
const std::string kSpillPrefixSortEnabled = "spark.gluten.sql.columnar.backend.velox.spillPrefixsortEnabled";
// Whether to compress data spilled. Compression will use spark.io.compression.codec or kSpillCompressionKind.
const std::string kSparkShuffleSpillCompress = "spark.shuffle.spill.compress";
const std::string kCompressionKind = "spark.io.compression.codec";
/// The compression codec to use for spilling. Use kCompressionKind if not set.
const std::string kSpillCompressionKind = "spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
const std::string kMaxPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
const std::string kMaxExtendedPartialAggregationMemoryRatio =
Expand Down
24 changes: 14 additions & 10 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)

def veloxMaxWriteBufferSize: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE)

def veloxBloomFilterExpectedNumItems: Long =
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)

Expand Down Expand Up @@ -571,6 +569,12 @@ object GlutenConfig {
val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = "spark.unsafe.sorter.spill.reader.buffer.size"
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = "spark.shuffle.spill.diskWriteBufferSize"
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true

// For Soft Affinity Scheduling
// Enable Soft Affinity Scheduling, default value is false
Expand Down Expand Up @@ -734,7 +738,14 @@ object GlutenConfig {
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString),
(
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString)
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
(
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString),
(
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString),
(SPARK_SHUFFLE_SPILL_COMPRESS, SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

Expand Down Expand Up @@ -1605,13 +1616,6 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("100G")

val COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize")
.internal()
.doc("The maximum write buffer size")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4M")

val MAX_PARTITION_PER_WRITERS_SESSION =
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession")
.internal()
Expand Down

0 comments on commit 62490ac

Please sign in to comment.