Skip to content

Commit

Permalink
[GLUTEN-7126][VL][1.2] Port #6698 #7525 #7560 for Uniffle bug fix (#7994
Browse files Browse the repository at this point in the history
)
  • Loading branch information
weiting-chen authored Nov 23, 2024
1 parent 773c9b4 commit abf0e1d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private boolean isDriver() {

public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(), "false");
// FIXME: remove this after https://github.com/apache/incubator-uniffle/pull/2193
conf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
}

@Override
Expand All @@ -69,6 +70,13 @@ public <K, V> ShuffleWriter<K, V> getWriter(
} else {
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
// set rss.row.based to false to mark it as columnar shuffle
SparkConf conf =
sparkConf
.clone()
.set(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED.key(),
"false");
return new VeloxUniffleColumnarShuffleWriter<>(
context.partitionId(),
rssHandle.getAppId(),
Expand All @@ -77,7 +85,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
context.taskAttemptId(),
writeMetrics,
this,
sparkConf,
conf,
shuffleWriteClient,
rssHandle,
this::markFailedTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.spark.util.SparkResourceUtil;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.exception.RssException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -125,9 +126,9 @@ public VeloxUniffleColumnarShuffleWriter(
}

@Override
protected void writeImpl(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext() && !isMemoryShuffleEnabled) {
super.sendCommit();
protected void writeImpl(Iterator<Product2<K, V>> records) {
if (!records.hasNext()) {
sendCommit();
return;
}
// writer already init
Expand Down Expand Up @@ -185,12 +186,19 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
}
}

long startTime = System.nanoTime();
LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
// If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException("nativeShuffleWriter should not be -1L");
sendCommit();
return;
}
long startTime = System.nanoTime();
SplitResult splitResult;
try {
splitResult = jniWrapper.stop(nativeShuffleWriter);
} catch (IOException e) {
throw new RssException(e);
}
splitResult = jniWrapper.stop(nativeShuffleWriter);
columnarDep
.metrics()
.get("splitTime")
Expand All @@ -210,16 +218,21 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
long pushMergedDataTime = System.nanoTime();
// clear all
sendRestBlockAndWait();
if (!isMemoryShuffleEnabled) {
super.sendCommit();
}
sendCommit();
long writeDurationMs = System.nanoTime() - pushMergedDataTime;
shuffleWriteMetrics.incWriteTime(writeDurationMs);
LOG.info(
"Finish write shuffle with rest write {} ms",
TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
}

@Override
protected void sendCommit() {
if (!isMemoryShuffleEnabled) {
super.sendCommit();
}
}

@Override
public Option<MapStatus> stop(boolean success) {
if (!stopping) {
Expand Down

0 comments on commit abf0e1d

Please sign in to comment.