diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 31318ff0aa0c..cd5196aa8c13 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -38,6 +38,8 @@ const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles"; const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default"; +const std::string kSparkOverheadMemory = "spark.gluten.memoryOverhead.size.in.bytes"; + const std::string kSparkOffHeapMemory = "spark.gluten.memory.offHeap.size.in.bytes"; const std::string kSparkTaskOffHeapMemory = "spark.gluten.memory.task.offHeap.size.in.bytes"; diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 8dc3ade80dec..dcfadc42c0c9 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -126,8 +126,18 @@ void VeloxBackend::init(const std::unordered_map& conf initUdf(); registerSparkTokenizer(); - // initialize the global memory manager for current process - facebook::velox::memory::MemoryManager::initialize({}); + // Initialize the global memory manager for current process. + auto sparkOverhead = backendConf_->get(kSparkOverheadMemory); + int64_t memoryManagerCapacity; + if (sparkOverhead.hasValue()) { + // 0.75 * total overhead memory is used for Velox global memory manager. + // FIXME: Make this configurable. + memoryManagerCapacity = sparkOverhead.value() * 0.75; + } else { + memoryManagerCapacity = facebook::velox::memory::kMaxMemory; + } + LOG(INFO) << "Setting global Velox memory manager with capacity: " << memoryManagerCapacity; + facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity = memoryManagerCapacity}); } facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java index e6b6ba07eb6b..74e0cbb8779b 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java @@ -89,8 +89,8 @@ public long borrow(long size) { .append( String.format( "\t%s=%s", - GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(), - SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED()))) + GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(), + SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED()))) .append(System.lineSeparator()) .append( String.format( diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index 6e3484dfa969..f775d78a15ac 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -156,13 +156,13 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { val minOffHeapSize = "1MB" if ( !conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false) && - (!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) || - conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( + (!conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) || + conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( minOffHeapSize)) ) { throw new GlutenException( - s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " + - s"and set '${GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize") + s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " + + s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize") } // Session's local time zone must be set. If not explicitly set by user, its default @@ -174,13 +174,23 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString) val onHeapSize: Long = - if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) { - conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY) + if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) { + conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY) } else { // 1GB default 1024 * 1024 * 1024 } + val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf) + conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString) + + // FIXME: The following is a workaround. Remove once the causes are fixed. + conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, Long.MaxValue.toString) + logWarning( + "Setting overhead memory that Gluten can use to UNLIMITED. This is currently a" + + " temporary solution to avoid OOM by Velox's global memory pools." + + " See GLUTEN-6960 for more information.") + // If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap // size. Otherwise, the off-heap size is set to the value specified by the user (if any). // Note that this means that we will IGNORE the off-heap size specified by the user if the @@ -200,17 +210,17 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // The 300MB value, unfortunately, is hard-coded in Spark code. ((onHeapSize - (300 * 1024 * 1024)) * conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong - } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) { + } else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) { // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution // memory pool, regardless of Spark option spark.memory.storageFraction. - conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY) + conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY) } else { // Default Spark Value. 0L } conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString) - conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString) + conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString) val offHeapPerTask = offHeapSize / taskSlots conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString) @@ -218,7 +228,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // If we are using dynamic off-heap sizing, we should also enable off-heap memory // officially. if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { - conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true") + conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true") // We already sized the off-heap per task in a conservative manner, so we can just // use it. diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala index f8c791fe1374..890ea31b6f1b 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala @@ -18,6 +18,8 @@ package org.apache.spark.util import org.apache.spark.{SparkConf, SparkMasterRegex} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} +import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.internal.SQLConf object SparkResourceUtil extends Logging { @@ -80,4 +82,15 @@ object SparkResourceUtil extends Logging { def isLocalMaster(conf: SparkConf): Boolean = { Utils.isLocalMaster(conf) } + + def getMemoryOverheadSize(conf: SparkConf): Long = { + val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse { + val executorMemMib = conf.get(EXECUTOR_MEMORY) + val factor = + conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d) + val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L) + (executorMemMib * factor).toLong.max(minMib) + } + ByteUnit.MiB.toBytes(overheadMib) + } } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 9e5161fea472..bb0e683c25eb 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -535,9 +535,11 @@ object GlutenConfig { val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend." // Private Spark configs. - val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory" - val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size" - val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" + val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory" + val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead" + val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor" + val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size" + val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" val SPARK_REDACTION_REGEX = "spark.redaction.regex" // For Soft Affinity Scheduling @@ -570,6 +572,7 @@ object GlutenConfig { // Added back to Spark Conf during executor initialization val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY = "spark.gluten.numTaskSlotsPerExecutor" + val GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY = "spark.gluten.memoryOverhead.size.in.bytes" val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes" val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.task.offHeap.size.in.bytes" val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = @@ -762,9 +765,10 @@ object GlutenConfig { SPARK_SQL_PARQUET_COMPRESSION_CODEC, // datasource config end + GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, - GLUTEN_OFFHEAP_ENABLED, + SPARK_OFFHEAP_ENABLED, SESSION_LOCAL_TIMEZONE.key, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key, SPARK_REDACTION_REGEX @@ -1244,6 +1248,16 @@ object GlutenConfig { .intConf .createWithDefaultString("-1") + val COLUMNAR_OVERHEAD_SIZE_IN_BYTES = + buildConf(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY) + .internal() + .doc( + "Must provide default value since non-execution operations " + + "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using " + + "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("0") + val COLUMNAR_OFFHEAP_SIZE_IN_BYTES = buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY) .internal()