Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6960][VL] Limit Velox untracked global memory manager's usage #6988

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles";

const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default";

const std::string kSparkOverheadMemory = "spark.gluten.memoryOverhead.size.in.bytes";

const std::string kSparkOffHeapMemory = "spark.gluten.memory.offHeap.size.in.bytes";

const std::string kSparkTaskOffHeapMemory = "spark.gluten.memory.task.offHeap.size.in.bytes";
Expand Down
14 changes: 12 additions & 2 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,18 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
initUdf();
registerSparkTokenizer();

// initialize the global memory manager for current process
facebook::velox::memory::MemoryManager::initialize({});
// Initialize the global memory manager for current process.
auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
int64_t memoryManagerCapacity;
if (sparkOverhead.hasValue()) {
// 0.75 * total overhead memory is used for Velox global memory manager.
// FIXME: Make this configurable.
memoryManagerCapacity = sparkOverhead.value() * 0.75;
} else {
memoryManagerCapacity = facebook::velox::memory::kMaxMemory;
}
LOG(INFO) << "Setting global Velox memory manager with capacity: " << memoryManagerCapacity;
facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity = memoryManagerCapacity});
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public long borrow(long size) {
.append(
String.format(
"\t%s=%s",
GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(),
SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED())))
GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(),
SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED())))
.append(System.lineSeparator())
.append(
String.format(
Expand Down
30 changes: 20 additions & 10 deletions gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
val minOffHeapSize = "1MB"
if (
!conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false) &&
(!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) ||
conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes(
(!conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) ||
conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes(
minOffHeapSize))
) {
throw new GlutenException(
s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " +
s"and set '${GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize")
s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize")
}

// Session's local time zone must be set. If not explicitly set by user, its default
Expand All @@ -174,13 +174,23 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)

val onHeapSize: Long =
if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)
if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
} else {
// 1GB default
1024 * 1024 * 1024
}

val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString)

// FIXME: The following is a workaround. Remove once the causes are fixed.
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, Long.MaxValue.toString)
logWarning(
"Setting overhead memory that Gluten can use to UNLIMITED. This is currently a" +
" temporary solution to avoid OOM by Velox's global memory pools." +
" See GLUTEN-6960 for more information.")

// If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the user (if any).
// Note that this means that we will IGNORE the off-heap size specified by the user if the
Expand All @@ -200,25 +210,25 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
// The 300MB value, unfortunately, is hard-coded in Spark code.
((onHeapSize - (300 * 1024 * 1024)) *
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong
} else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) {
} else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
// Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution
// memory pool, regardless of Spark option spark.memory.storageFraction.
conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)
conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
} else {
// Default Spark Value.
0L
}

conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString)
conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString)
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)

val offHeapPerTask = offHeapSize / taskSlots
conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)

// If we are using dynamic off-heap sizing, we should also enable off-heap memory
// officially.
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true")
conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")

// We already sized the off-heap per task in a conservative manner, so we can just
// use it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.util

import org.apache.spark.{SparkConf, SparkMasterRegex}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf

object SparkResourceUtil extends Logging {
Expand Down Expand Up @@ -80,4 +82,15 @@ object SparkResourceUtil extends Logging {
def isLocalMaster(conf: SparkConf): Boolean = {
Utils.isLocalMaster(conf)
}

def getMemoryOverheadSize(conf: SparkConf): Long = {
val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
val executorMemMib = conf.get(EXECUTOR_MEMORY)
val factor =
conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d)
val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
(executorMemMib * factor).toLong.max(minMib)
}
ByteUnit.MiB.toBytes(overheadMib)
}
}
22 changes: 18 additions & 4 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,11 @@ object GlutenConfig {
val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend."

// Private Spark configs.
val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory"
val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory"
val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead"
val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor"
val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"

// For Soft Affinity Scheduling
Expand Down Expand Up @@ -570,6 +572,7 @@ object GlutenConfig {

// Added back to Spark Conf during executor initialization
val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY = "spark.gluten.numTaskSlotsPerExecutor"
val GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY = "spark.gluten.memoryOverhead.size.in.bytes"
val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes"
val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.task.offHeap.size.in.bytes"
val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
Expand Down Expand Up @@ -762,9 +765,10 @@ object GlutenConfig {
SPARK_SQL_PARQUET_COMPRESSION_CODEC,
// datasource config end

GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_ENABLED,
SPARK_OFFHEAP_ENABLED,
SESSION_LOCAL_TIMEZONE.key,
DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SPARK_REDACTION_REGEX
Expand Down Expand Up @@ -1244,6 +1248,16 @@ object GlutenConfig {
.intConf
.createWithDefaultString("-1")

val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY)
.internal()
.doc(
"Must provide default value since non-execution operations " +
"(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using " +
"org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("0")

val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY)
.internal()
Expand Down
Loading