-
Notifications
You must be signed in to change notification settings - Fork 443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-8025][VL] Respect config kSpillReadBufferSize and add spill compression codec #8045
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -321,8 +321,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { | |
|
||
def veloxMaxSpillBytes: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_BYTES) | ||
|
||
def veloxMaxWriteBufferSize: Long = conf.getConf(COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE) | ||
|
||
def veloxBloomFilterExpectedNumItems: Long = | ||
conf.getConf(COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS) | ||
|
||
|
@@ -571,6 +569,12 @@ object GlutenConfig { | |
val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" | ||
val SPARK_REDACTION_REGEX = "spark.redaction.regex" | ||
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer" | ||
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = "spark.unsafe.sorter.spill.reader.buffer.size" | ||
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024 | ||
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = "spark.shuffle.spill.diskWriteBufferSize" | ||
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024 | ||
val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress" | ||
val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true | ||
|
||
// For Soft Affinity Scheduling | ||
// Enable Soft Affinity Scheduling, default value is false | ||
|
@@ -734,7 +738,14 @@ object GlutenConfig { | |
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString), | ||
( | ||
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key, | ||
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString) | ||
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString), | ||
( | ||
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, | ||
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString), | ||
( | ||
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, | ||
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString), | ||
(SPARK_SHUFFLE_SPILL_COMPRESS, SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString) | ||
) | ||
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2))) | ||
|
||
|
@@ -1605,13 +1616,6 @@ object GlutenConfig { | |
.bytesConf(ByteUnit.BYTE) | ||
.createWithDefaultString("100G") | ||
|
||
val COLUMNAR_VELOX_MAX_SPILL_WRITE_BUFFER_SIZE = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's put all configurations in GlutenConfig.scala even it's not documented in Gluten doc. Can you add spillreadbuffersize as well? |
||
buildConf("spark.gluten.sql.columnar.backend.velox.spillWriteBufferSize") | ||
.internal() | ||
.doc("The maximum write buffer size") | ||
.bytesConf(ByteUnit.BYTE) | ||
.createWithDefaultString("4M") | ||
|
||
val MAX_PARTITION_PER_WRITERS_SESSION = | ||
buildConf("spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession") | ||
.internal() | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check if spark.unsafe.sorter.spill.read.ahead.enabled is enabled or not. If not, close the spill read adhead buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileInputStream implement decides it must read some buffer ahead even if spark.unsafe.sorter.spill.read.ahead.enabled is disabled. We could add a new class FileInputReader to read the file as need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if we set the buffer size to 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not check the config value reasonable.
It may cause dead cycle because it read file 0 byte one time.
https://github.com/facebookincubator/velox/blob/main/velox/common/file/FileInputStream.cpp#L207
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then let's ignore it for now and open an enhancement issue on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added, facebookincubator/velox#11673.
It won't affect us now, because Spark config requires the value is more than default value.
I will add the FileReadStream to Velox to support disable read ahead.