Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8025][VL] Respect config kSpillReadBufferSize and add spill compression codec #8045

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const std::string kUGITokens = "spark.gluten.ugi.tokens";

const std::string kShuffleCompressionCodec = "spark.gluten.sql.columnar.shuffle.codec";
const std::string kShuffleCompressionCodecBackend = "spark.gluten.sql.columnar.shuffle.codecBackend";
const std::string kShuffleSpillDiskWriteBufferSize = "spark.shuffle.spill.diskWriteBufferSize";
const std::string kQatBackendName = "qat";
const std::string kIaaBackendName = "iaa";

Expand Down
12 changes: 9 additions & 3 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,17 +516,23 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
configs[velox::core::QueryConfig::kMaxSpillBytes] =
std::to_string(veloxCfg_->get<uint64_t>(kMaxSpillBytes, 107374182400LL));
configs[velox::core::QueryConfig::kSpillWriteBufferSize] =
std::to_string(veloxCfg_->get<uint64_t>(kSpillWriteBufferSize, 4L * 1024 * 1024));
std::to_string(veloxCfg_->get<uint64_t>(kShuffleSpillDiskWriteBufferSize, 1L * 1024 * 1024));
configs[velox::core::QueryConfig::kSpillReadBufferSize] =
std::to_string(veloxCfg_->get<int32_t>(kSpillReadBufferSize, 1L * 1024 * 1024));
configs[velox::core::QueryConfig::kSpillStartPartitionBit] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillStartPartitionBit, 29));
configs[velox::core::QueryConfig::kSpillNumPartitionBits] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillPartitionBits, 3));
configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] =
std::to_string(veloxCfg_->get<uint8_t>(kSpillableReservationGrowthPct, 25));
configs[velox::core::QueryConfig::kSpillCompressionKind] =
veloxCfg_->get<std::string>(kSpillCompressionKind, "lz4");
configs[velox::core::QueryConfig::kSpillPrefixSortEnabled] =
veloxCfg_->get<std::string>(kSpillPrefixSortEnabled, "false");
if (veloxCfg_->get<bool>(kSparkShuffleSpillCompress, true)) {
configs[velox::core::QueryConfig::kSpillCompressionKind] =
veloxCfg_->get<std::string>(kSpillCompressionKind, veloxCfg_->get<std::string>(kCompressionKind, "lz4"));
} else {
configs[velox::core::QueryConfig::kSpillCompressionKind] = "none";
}
configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] =
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterExpectedNumItems, 1000000));
configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] =
Expand Down
8 changes: 6 additions & 2 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ const std::string kSpillStartPartitionBit = "spark.gluten.sql.columnar.backend.v
const std::string kSpillPartitionBits = "spark.gluten.sql.columnar.backend.velox.spillPartitionBits";
const std::string kMaxSpillRunRows = "spark.gluten.sql.columnar.backend.velox.MaxSpillRunRows";
const std::string kMaxSpillBytes = "spark.gluten.sql.columnar.backend.velox.MaxSpillBytes";
const std::string kSpillWriteBufferSize = "spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize";
const std::string kSpillReadBufferSize = "spark.unsafe.sorter.spill.reader.buffer.size";
const uint64_t kMaxSpillFileSizeDefault = 1L * 1024 * 1024 * 1024;

const std::string kSpillableReservationGrowthPct =
"spark.gluten.sql.columnar.backend.velox.spillableReservationGrowthPct";
const std::string kSpillCompressionKind = "spark.io.compression.codec";
const std::string kSpillPrefixSortEnabled = "spark.gluten.sql.columnar.backend.velox.spillPrefixsortEnabled";
// Whether to compress data spilled. Compression will use spark.io.compression.codec or kSpillCompressionKind.
const std::string kSparkShuffleSpillCompress = "spark.shuffle.spill.compress";
const std::string kCompressionKind = "spark.io.compression.codec";
/// The compression codec to use for spilling. Use kCompressionKind if not set.
const std::string kSpillCompressionKind = "spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
const std::string kMaxPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
const std::string kMaxExtendedPartialAggregationMemoryRatio =
Expand Down
24 changes: 14 additions & 10 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES)

def veloxMaxWriteBufferSize: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE)

def veloxBloomFilterExpectedNumItems: Long =
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS)

Expand Down Expand Up @@ -571,6 +569,12 @@ object GlutenConfig {
val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = "spark.unsafe.sorter.spill.reader.buffer.size"
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = "spark.shuffle.spill.diskWriteBufferSize"
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true

// For Soft Affinity Scheduling
// Enable Soft Affinity Scheduling, default value is false
Expand Down Expand Up @@ -734,7 +738,14 @@ object GlutenConfig {
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString),
(
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString)
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check if spark.unsafe.sorter.spill.read.ahead.enabled is enabled or not. If not, close the spill read adhead buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileInputStream implement decides it must read some buffer ahead even if spark.unsafe.sorter.spill.read.ahead.enabled is disabled. We could add a new class FileInputReader to read the file as need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we set the buffer size to 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not check the config value reasonable.
It may cause dead cycle because it read file 0 byte one time.
https://github.com/facebookincubator/velox/blob/main/velox/common/file/FileInputStream.cpp#L207

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then let's ignore it for now and open an enhancement issue on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added, facebookincubator/velox#11673.
It won't affect us now, because Spark config requires the value is more than default value.

private[spark] val UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
    ConfigBuilder("spark.unsafe.sorter.spill.reader.buffer.size")
      .internal()
      .version("2.1.0")
      .bytesConf(ByteUnit.BYTE)
      .checkValue(v => 1024 * 1024 <= v && v <= MAX_BUFFER_SIZE_BYTES,
        s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].")
      .createWithDefault(1024 * 1024)

I will add the FileReadStream to Velox to support disable read ahead.

(
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString),
(
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString),
(SPARK_SHUFFLE_SPILL_COMPRESS, SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

Expand Down Expand Up @@ -1605,13 +1616,6 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("100G")

val COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put all configurations in GlutenConfig.scala even it's not documented in Gluten doc.

Can you add spillreadbuffersize as well?

buildConf("spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize")
.internal()
.doc("The maximum write buffer size")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4M")

val MAX_PARTITION_PER_WRITERS_SESSION =
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession")
.internal()
Expand Down
Loading