Skip to content

Commit

Permalink
[VL] Branch-1.2: port PR #6503 PR #6515 PR #6554 (#6595)
Browse files Browse the repository at this point in the history
* [GLUTEN-6501][VL] Fix the missing fileReadProperties when constructing a LocalFilesNode (#6503)

* [GLUTEN-6477][VL] Fix occasional dead lock during spilling (#6515)

* [VL] Add thread_safe to several VeloxRuntime classes (#6526)

VeloxRuntime is shared by many threads, like task threads or parquet writter threads. We must make sure the objects hold by VeloxRuntime are thread safe.

* [VL] Following #6526, minor fixes and improvements (#6554)

---------

Co-authored-by: zhaokuo <zhaokuo_game@163.com>
Co-authored-by: Hongze Zhang <hongze.zhang@intel.com>
Co-authored-by: BInwei Yang <felixybw@apache.org>
  • Loading branch information
4 people authored Jul 25, 2024
1 parent 25f40d4 commit c9f3d89
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo = {
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
val partLists = new JArrayList[String]()
Expand Down Expand Up @@ -183,7 +184,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
preferredLocations.toList.asJava
preferredLocations.toList.asJava,
mapAsJavaMap(properties)
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition: $partition.")
Expand All @@ -209,7 +211,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
split match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scans(i))
filesNode.setFileReadProperties(mapAsJavaMap(scans(i).getProperties))
filesNode.getPaths.forEach(f => files += f)
filesNode.toProtobuf.toByteArray
case extensionTableNode: ExtensionTableNode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo = {
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (
Expand All @@ -78,7 +79,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionColumns,
metadataColumns,
fileFormat,
preferredLocations.toList.asJava)
preferredLocations.toList.asJava,
mapAsJavaMap(properties)
)
case _ =>
throw new UnsupportedOperationException(s"Unsupported input partition.")
}
Expand Down
29 changes: 21 additions & 8 deletions cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring co
}
}

/*
NOTE: the class must be thread safe
*/

class SparkAllocationListener final : public gluten::AllocationListener {
public:
SparkAllocationListener(JavaVM* vm, jobject jListenerLocalRef, jmethodID jReserveMethod, jmethodID jUnreserveMethod)
Expand Down Expand Up @@ -399,25 +403,34 @@ class SparkAllocationListener final : public gluten::AllocationListener {
env->CallLongMethod(jListenerGlobalRef_, jReserveMethod_, size);
checkException(env);
}
bytesReserved_ += size;
maxBytesReserved_ = std::max(bytesReserved_, maxBytesReserved_);
usedBytes_ += size;
while (true) {
int64_t savedPeakBytes = peakBytes_;
if (usedBytes_ <= savedPeakBytes) {
break;
}
// usedBytes_ > savedPeakBytes, update peak
if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
break;
}
}
}

int64_t currentBytes() override {
return bytesReserved_;
return usedBytes_;
}

int64_t peakBytes() override {
return maxBytesReserved_;
return peakBytes_;
}

private:
JavaVM* vm_;
jobject jListenerGlobalRef_;
jmethodID jReserveMethod_;
jmethodID jUnreserveMethod_;
int64_t bytesReserved_ = 0L;
int64_t maxBytesReserved_ = 0L;
const jmethodID jReserveMethod_;
const jmethodID jUnreserveMethod_;
std::atomic_int64_t usedBytes_{0L};
std::atomic_int64_t peakBytes_{0L};
};

class BacktraceAllocationListener final : public gluten::AllocationListener {
Expand Down
50 changes: 31 additions & 19 deletions cpp/core/memory/AllocationListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <memory>
#include <mutex>

namespace gluten {

Expand Down Expand Up @@ -46,29 +47,21 @@ class AllocationListener {
};

/// Memory changes will be round to specified block size which aim to decrease delegated listener calls.
// The class must be thread safe
class BlockAllocationListener final : public AllocationListener {
public:
BlockAllocationListener(AllocationListener* delegated, uint64_t blockSize)
BlockAllocationListener(AllocationListener* delegated, int64_t blockSize)
: delegated_(delegated), blockSize_(blockSize) {}

void allocationChanged(int64_t diff) override {
if (diff == 0) {
return;
}
if (diff > 0) {
if (reservationBytes_ - usedBytes_ < diff) {
auto roundSize = (diff + (blockSize_ - 1)) / blockSize_ * blockSize_;
delegated_->allocationChanged(roundSize);
reservationBytes_ += roundSize;
peakBytes_ = std::max(peakBytes_, reservationBytes_);
}
usedBytes_ += diff;
} else {
usedBytes_ += diff;
auto unreservedSize = (reservationBytes_ - usedBytes_) / blockSize_ * blockSize_;
delegated_->allocationChanged(-unreservedSize);
reservationBytes_ -= unreservedSize;
int64_t granted = reserve(diff);
if (granted == 0) {
return;
}
delegated_->allocationChanged(granted);
}

int64_t currentBytes() override {
Expand All @@ -80,11 +73,30 @@ class BlockAllocationListener final : public AllocationListener {
}

private:
AllocationListener* delegated_;
uint64_t blockSize_{0L};
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
uint64_t reservationBytes_{0L};
inline int64_t reserve(int64_t diff) {
std::lock_guard<std::mutex> lock(mutex_);
usedBytes_ += diff;
int64_t newBlockCount;
if (usedBytes_ == 0) {
newBlockCount = 0;
} else {
// ceil to get the required block number
newBlockCount = (usedBytes_ - 1) / blockSize_ + 1;
}
int64_t bytesGranted = (newBlockCount - blocksReserved_) * blockSize_;
blocksReserved_ = newBlockCount;
peakBytes_ = std::max(peakBytes_, usedBytes_);
return bytesGranted;
}

AllocationListener* const delegated_;
const uint64_t blockSize_;
int64_t blocksReserved_{0L};
int64_t usedBytes_{0L};
int64_t peakBytes_{0L};
int64_t reservationBytes_{0L};

mutable std::mutex mutex_;
};

} // namespace gluten
11 changes: 10 additions & 1 deletion cpp/core/memory/MemoryAllocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,16 @@ int64_t ListenableMemoryAllocator::peakBytes() const {
void ListenableMemoryAllocator::updateUsage(int64_t size) {
listener_->allocationChanged(size);
usedBytes_ += size;
peakBytes_ = std::max(peakBytes_, usedBytes_);
while (true) {
int64_t savedPeakBytes = peakBytes_;
if (usedBytes_ <= savedPeakBytes) {
break;
}
// usedBytes_ > savedPeakBytes, update peak
if (peakBytes_.compare_exchange_weak(savedPeakBytes, usedBytes_)) {
break;
}
}
}

bool StdMemoryAllocator::allocate(int64_t size, void** out) {
Expand Down
9 changes: 5 additions & 4 deletions cpp/core/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MemoryAllocator {
virtual int64_t peakBytes() const = 0;
};

// The class must be thread safe
class ListenableMemoryAllocator final : public MemoryAllocator {
public:
explicit ListenableMemoryAllocator(MemoryAllocator* delegated, AllocationListener* listener)
Expand All @@ -69,10 +70,10 @@ class ListenableMemoryAllocator final : public MemoryAllocator {

private:
void updateUsage(int64_t size);
MemoryAllocator* delegated_;
AllocationListener* listener_;
uint64_t usedBytes_{0L};
uint64_t peakBytes_{0L};
MemoryAllocator* const delegated_;
AllocationListener* const listener_;
std::atomic_int64_t usedBytes_{0L};
std::atomic_int64_t peakBytes_{0L};
};

class StdMemoryAllocator final : public MemoryAllocator {
Expand Down
34 changes: 12 additions & 22 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,25 @@ std::shared_ptr<ColumnarBatch> WholeStageResultIterator::next() {
}

namespace {
class ConditionalSuspendedSection {
class SuspendedSection {
public:
ConditionalSuspendedSection(velox::exec::Driver* driver, bool condition) {
if (condition) {
section_ = new velox::exec::SuspendedSection(driver);
}
SuspendedSection() {
reclaimer_->enterArbitration();
}

virtual ~ConditionalSuspendedSection() {
if (section_) {
delete section_;
}
virtual ~SuspendedSection() {
reclaimer_->leaveArbitration();
}

// singleton
ConditionalSuspendedSection(const ConditionalSuspendedSection&) = delete;
ConditionalSuspendedSection(ConditionalSuspendedSection&&) = delete;
ConditionalSuspendedSection& operator=(const ConditionalSuspendedSection&) = delete;
ConditionalSuspendedSection& operator=(ConditionalSuspendedSection&&) = delete;
SuspendedSection(const SuspendedSection&) = delete;
SuspendedSection(SuspendedSection&&) = delete;
SuspendedSection& operator=(const SuspendedSection&) = delete;
SuspendedSection& operator=(SuspendedSection&&) = delete;

private:
velox::exec::SuspendedSection* section_ = nullptr;
// We only use suspension APIs in exec::MemoryReclaimer.
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer_{velox::exec::MemoryReclaimer::create()};
};
} // namespace

Expand All @@ -244,15 +241,8 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
if (spillStrategy_ == "auto") {
int64_t remaining = size - shrunken;
LOG(INFO) << logPrefix << "Trying to request spilling for " << remaining << " bytes...";
// if we are on one of the driver of the spilled task, suspend it
velox::exec::Driver* thisDriver = nullptr;
task_->testingVisitDrivers([&](velox::exec::Driver* driver) {
if (driver->isOnThread()) {
thisDriver = driver;
}
});
// suspend the driver when we are on it
ConditionalSuspendedSection noCancel(thisDriver, thisDriver != nullptr);
SuspendedSection suspender;
velox::exec::MemoryReclaimer::Stats status;
auto* mm = memoryManager_->getMemoryManager();
uint64_t spilledOut = mm->arbitrator()->shrinkCapacity({pool}, remaining); // this conducts spilling
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/memory/VeloxMemoryManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

namespace gluten {

// Make sure the class is thread safe
class VeloxMemoryManager final : public MemoryManager {
public:
VeloxMemoryManager(std::unique_ptr<AllocationListener> listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public static LocalFilesNode makeLocalFiles(
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
LocalFilesNode.ReadFileFormat fileFormat,
List<String> preferredLocations) {
List<String> preferredLocations,
Map<String, String> properties) {
return new LocalFilesNode(
index,
paths,
Expand All @@ -43,7 +44,8 @@ public static LocalFilesNode makeLocalFiles(
partitionColumns,
metadataColumns,
fileFormat,
preferredLocations);
preferredLocations,
properties);
}

public static LocalFilesNode makeLocalFiles(String iterPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public enum ReadFileFormat {
List<Map<String, String>> partitionColumns,
List<Map<String, String>> metadataColumns,
ReadFileFormat fileFormat,
List<String> preferredLocations) {
List<String> preferredLocations,
Map<String, String> properties) {
this.index = index;
this.paths.addAll(paths);
this.starts.addAll(starts);
Expand All @@ -78,6 +79,7 @@ public enum ReadFileFormat {
this.partitionColumns.addAll(partitionColumns);
this.metadataColumns.addAll(metadataColumns);
this.preferredLocations.addAll(preferredLocations);
this.fileReadProperties = properties;
}

LocalFilesNode(String iterPath) {
Expand Down Expand Up @@ -109,10 +111,6 @@ private NamedStruct buildNamedStruct() {
return namedStructBuilder.build();
}

public void setFileReadProperties(Map<String, String> fileReadProperties) {
this.fileReadProperties = fileReadProperties;
}

@Override
public List<String> preferredLocations() {
return this.preferredLocations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ trait IteratorApi {
partition: InputPartition,
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String]): SplitInfo
metadataColumnNames: Seq[String],
properties: Map[String, String]): SplitInfo

/** Generate native row partition. */
def genPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(_, getPartitionSchema, fileFormat, getMetadataColumns.map(_.name)))
.genSplitInfo(
_,
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
getProperties))
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iceberg.DeleteFile;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -47,7 +48,8 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
partitionColumns,
new ArrayList<>(),
fileFormat,
preferredLocations);
preferredLocations,
new HashMap<>());
this.deleteFilesList = deleteFilesList;
}

Expand Down

0 comments on commit c9f3d89

Please sign in to comment.