Skip to content

Commit

Permalink
[CORE][BRANCH-1.2] Port #7861 to fix OOM in shuffle writer (#8078)
Browse files Browse the repository at this point in the history
* Impl MmapFileStream

* Prefetch spark.shuffle.file.buffer

* fix ut

* format

---------

Co-authored-by: Rong Ma <rong.ma@intel.com>
  • Loading branch information
ccat3z and marin-ma authored Nov 29, 2024
1 parent 54d66ae commit 6e86564
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 10 deletions.
2 changes: 2 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const std::string kShuffleCompressionCodecBackend = "spark.gluten.sql.columnar.s
const std::string kQatBackendName = "qat";
const std::string kIaaBackendName = "iaa";

const std::string kShuffleFileBufferSize = "spark.shuffle.file.buffer";

std::unordered_map<std::string, std::string>
parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength);

Expand Down
7 changes: 7 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
partitionWriterOptions.codecBackend = getCodecBackend(env, codecBackendJstr);
partitionWriterOptions.compressionMode = getCompressionMode(env, compressionModeJstr);
}
const auto& conf = ctx->getConfMap();
{
auto it = conf.find(kShuffleFileBufferSize);
if (it != conf.end()) {
partitionWriterOptions.shuffleFileBufferSize = static_cast<int64_t>(stoi(it->second));
}
}

std::unique_ptr<PartitionWriter> partitionWriter;

Expand Down
4 changes: 3 additions & 1 deletion cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ arrow::Status LocalPartitionWriter::openDataFile() {
ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(dataFile_));
if (options_.bufferedWrite) {
// Output stream buffer is neither partition buffer memory nor ipc memory.
ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, pool_, fout));
ARROW_ASSIGN_OR_RAISE(
dataFileOs_, arrow::io::BufferedOutputStream::Create(options_.shuffleFileBufferSize, pool_, fout));
} else {
dataFileOs_ = fout;
}
Expand Down Expand Up @@ -422,6 +423,7 @@ arrow::Status LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
auto spillIter = spills_.begin();
while (spillIter != spills_.end()) {
ARROW_ASSIGN_OR_RAISE(auto st, dataFileOs_->Tell());
(*spillIter)->openForRead(options_.shuffleFileBufferSize);
// Read if partition exists in the spilled file and write to the final file.
while (auto payload = (*spillIter)->nextPayload(partitionId)) {
// May trigger spill during compression.
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ static constexpr int32_t kDefaultBufferAlignment = 64;
static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;
static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;

enum ShuffleWriterType { kHashShuffle, kSortShuffle };
enum PartitionWriterType { kLocal, kRss };
Expand Down Expand Up @@ -75,6 +76,8 @@ struct PartitionWriterOptions {
int64_t pushBufferMaxSize = kDefaultPushMemoryThreshold;

int64_t sortBufferMaxSize = kDefaultSortBufferThreshold;

int64_t shuffleFileBufferSize = kDefaultShuffleFileBufferSize;
};

struct ShuffleWriterMetrics {
Expand Down
5 changes: 5 additions & 0 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,9 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> UncompressedDiskBlockPayload::read
}

arrow::Status UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* outputStream) {
ARROW_RETURN_IF(
inputStream_ == nullptr, arrow::Status::Invalid("inputStream_ is uninitialized before calling serialize()."));

if (codec_ == nullptr || type_ == Payload::kUncompressed) {
ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_));
RETURN_NOT_OK(outputStream->Write(block));
Expand Down Expand Up @@ -526,6 +529,8 @@ CompressedDiskBlockPayload::CompressedDiskBlockPayload(
: Payload(Type::kCompressed, numRows, isValidityBuffer), inputStream_(inputStream), rawSize_(rawSize) {}

arrow::Status CompressedDiskBlockPayload::serialize(arrow::io::OutputStream* outputStream) {
ARROW_RETURN_IF(
inputStream_ == nullptr, arrow::Status::Invalid("inputStream_ is uninitialized before calling serialize()."));
ScopedTimer timer(&writeTime_);
ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_));
RETURN_NOT_OK(outputStream->Write(block));
Expand Down
5 changes: 2 additions & 3 deletions cpp/core/shuffle/Spill.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ bool Spill::hasNextPayload(uint32_t partitionId) {
}

std::unique_ptr<Payload> Spill::nextPayload(uint32_t partitionId) {
openSpillFile();
if (!hasNextPayload(partitionId)) {
return nullptr;
}
Expand Down Expand Up @@ -72,9 +71,9 @@ void Spill::insertPayload(
}
}

void Spill::openSpillFile() {
void Spill::openForRead(uint64_t shuffleFileBufferSize) {
if (!is_) {
GLUTEN_ASSIGN_OR_THROW(is_, arrow::io::MemoryMappedFile::Open(spillFile_, arrow::io::FileMode::READ));
GLUTEN_ASSIGN_OR_THROW(is_, MmapFileStream::open(spillFile_, shuffleFileBufferSize));
rawIs_ = is_.get();
}
}
Expand Down
9 changes: 4 additions & 5 deletions cpp/core/shuffle/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Spill final {

SpillType type() const;

void openForRead(uint64_t shuffleFileBufferSize);

bool hasNextPayload(uint32_t partitionId);

std::unique_ptr<Payload> nextPayload(uint32_t partitionId);
Expand All @@ -57,13 +59,10 @@ class Spill final {
};

SpillType type_;
std::shared_ptr<arrow::io::MemoryMappedFile> is_;
std::shared_ptr<gluten::MmapFileStream> is_;
std::list<PartitionPayload> partitionPayloads_{};
std::shared_ptr<arrow::io::MemoryMappedFile> inputStream_{};
std::string spillFile_;

arrow::io::InputStream* rawIs_;

void openSpillFile();
arrow::io::InputStream* rawIs_{nullptr};
};
} // namespace gluten
113 changes: 113 additions & 0 deletions cpp/core/shuffle/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
*/

#include "shuffle/Utils.h"
#include <arrow/buffer.h>
#include <arrow/record_batch.h>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fcntl.h>
#include <glog/logging.h>
#include <sys/mman.h>
#include <unistd.h>
#include <iomanip>
#include <iostream>
#include <numeric>
Expand Down Expand Up @@ -151,6 +155,14 @@ arrow::Status getLengthBufferAndValueBufferStream(
*compressedLengthPtr = actualLength;
return arrow::Status::OK();
}

uint64_t roundUpToPageSize(uint64_t value) {
static auto pageSize = static_cast<size_t>(arrow::internal::GetPageSize());
static auto pageMask = ~(pageSize - 1);
DCHECK_GT(pageSize, 0);
DCHECK_EQ(pageMask & pageSize, pageSize);
return (value + pageSize - 1) & pageMask;
}
} // namespace

arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeCompressedRecordBatch(
Expand Down Expand Up @@ -212,6 +224,107 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(
}
return arrow::RecordBatch::Make(writeSchema, 1, {arrays});
}

MmapFileStream::MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size, uint64_t prefetchSize)
: prefetchSize_(roundUpToPageSize(prefetchSize)), fd_(std::move(fd)), data_(data), size_(size){};

arrow::Result<std::shared_ptr<MmapFileStream>> MmapFileStream::open(const std::string& path, uint64_t prefetchSize) {
ARROW_ASSIGN_OR_RAISE(auto fileName, arrow::internal::PlatformFilename::FromString(path));

ARROW_ASSIGN_OR_RAISE(auto fd, arrow::internal::FileOpenReadable(fileName));
ARROW_ASSIGN_OR_RAISE(auto size, arrow::internal::FileGetSize(fd.fd()));

ARROW_RETURN_IF(size == 0, arrow::Status::Invalid("Cannot mmap an empty file: ", path));

void* result = mmap(nullptr, size, PROT_READ, MAP_PRIVATE, fd.fd(), 0);
if (result == MAP_FAILED) {
return arrow::Status::IOError("Memory mapping file failed: ", ::arrow::internal::ErrnoMessage(errno));
}

return std::make_shared<MmapFileStream>(std::move(fd), static_cast<uint8_t*>(result), size, prefetchSize);
}

arrow::Result<int64_t> MmapFileStream::actualReadSize(int64_t nbytes) {
if (nbytes < 0 || pos_ > size_) {
return arrow::Status::IOError("Read out of range. Offset: ", pos_, " Size: ", nbytes, " File Size: ", size_);
}
return std::min(size_ - pos_, nbytes);
}

bool MmapFileStream::closed() const {
return data_ == nullptr;
};

void MmapFileStream::advance(int64_t length) {
// Dont need data before pos
auto purgeLength = (pos_ - posRetain_) / prefetchSize_ * prefetchSize_;
if (purgeLength > 0) {
int ret = madvise(data_ + posRetain_, purgeLength, MADV_DONTNEED);
if (ret != 0) {
LOG(WARNING) << "fadvise failed " << ::arrow::internal::ErrnoMessage(errno);
}
posRetain_ += purgeLength;
}

pos_ += length;
}

void MmapFileStream::willNeed(int64_t length) {
// Skip if already fetched
if (pos_ + length <= posFetch_) {
return;
}

// Round up to multiple of prefetchSize
auto fetchLen = ((length + prefetchSize_ - 1) / prefetchSize_) * prefetchSize_;
fetchLen = std::min(size_ - pos_, fetchLen);
int ret = madvise(data_ + posFetch_, fetchLen, MADV_WILLNEED);
if (ret != 0) {
LOG(WARNING) << "madvise willneed failed: " << ::arrow::internal::ErrnoMessage(errno);
}

posFetch_ += fetchLen;
}

arrow::Status MmapFileStream::Close() {
if (data_ != nullptr) {
int result = munmap(data_, size_);
if (result != 0) {
LOG(WARNING) << "munmap failed";
}
data_ = nullptr;
}

return fd_.Close();
}

arrow::Result<int64_t> MmapFileStream::Tell() const {
return pos_;
}

arrow::Result<int64_t> MmapFileStream::Read(int64_t nbytes, void* out) {
ARROW_ASSIGN_OR_RAISE(nbytes, actualReadSize(nbytes));

if (nbytes > 0) {
memcpy(out, data_ + pos_, nbytes);
advance(nbytes);
}

return nbytes;
}

arrow::Result<std::shared_ptr<arrow::Buffer>> MmapFileStream::Read(int64_t nbytes) {
ARROW_ASSIGN_OR_RAISE(nbytes, actualReadSize(nbytes));

if (nbytes > 0) {
auto buffer = std::make_shared<arrow::Buffer>(data_ + pos_, nbytes);
willNeed(nbytes);
advance(nbytes);
return buffer;
} else {
return std::make_shared<arrow::Buffer>(nullptr, 0);
}
}
} // namespace gluten

std::string gluten::generateUuid() {
Expand Down
35 changes: 35 additions & 0 deletions cpp/core/shuffle/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,39 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

std::shared_ptr<arrow::Buffer> zeroLengthNullBuffer();

// MmapFileStream is used to optimize sequential file reading. It uses madvise
// to prefetch and release memory timely.
class MmapFileStream : public arrow::io::InputStream {
public:
MmapFileStream(arrow::internal::FileDescriptor fd, uint8_t* data, int64_t size, uint64_t prefetchSize);

static arrow::Result<std::shared_ptr<MmapFileStream>> open(const std::string& path, uint64_t prefetchSize = 0);

arrow::Result<int64_t> Tell() const override;

arrow::Status Close() override;

arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;

arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;

bool closed() const override;

private:
arrow::Result<int64_t> actualReadSize(int64_t nbytes);

void advance(int64_t length);

void willNeed(int64_t length);

// Page-aligned prefetch size
const int64_t prefetchSize_;
arrow::internal::FileDescriptor fd_;
uint8_t* data_ = nullptr;
int64_t size_;
int64_t pos_ = 0;
int64_t posFetch_ = 0;
int64_t posRetain_ = 0;
};

} // namespace gluten
12 changes: 11 additions & 1 deletion shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten

import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.sql.internal.SQLConf

import com.google.common.collect.ImmutableList
Expand Down Expand Up @@ -523,6 +523,7 @@ object GlutenConfig {
val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory"
val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"

// For Soft Affinity Scheduling
// Enable Soft Affinity Scheduling, defalut value is false
Expand Down Expand Up @@ -667,6 +668,15 @@ object GlutenConfig {
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

conf
.get(SPARK_SHUFFLE_FILE_BUFFER)
.foreach(
v =>
nativeConfMap
.put(
SPARK_SHUFFLE_FILE_BUFFER,
(JavaUtils.byteStringAs(v, ByteUnit.KiB) * 1024).toString))

// Backend's dynamic session conf only.
conf
.filter(entry => entry._1.startsWith(backendPrefix) && !SQLConf.isStaticConfigKey(entry._1))
Expand Down

0 comments on commit 6e86564

Please sign in to comment.