Skip to content

Commit

Permalink
[SPARK-48642][CORE] False SparkOutOfMemoryError caused by killing tas…
Browse files Browse the repository at this point in the history
…k on spilling

### What changes were proposed in this pull request?

Throw `RuntimeException` instead of `SparkOutOfMemoryError` when underlying calls throw `InterruptedIOException` in `TaskMemoryManager#trySpillAndAcquire`

### Why are the changes needed?

A false `SparkOutOfMemoryError` case was identified in our production Spark jobs, and it is similar to SPARK-20250

```
2024-06-17 06:03:20 CST Executor INFO - Executor is trying to kill task 1580.1 in stage 48.0 (TID 59486), reason: another attempt succeeded
2024-06-17 06:03:20 CST TaskMemoryManager ERROR - error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7
java.io.InterruptedIOException: null
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:234) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:272) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:251) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.io.DataInputStream.readInt(DataInputStream.java:393) ~[?:?]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:80) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:626) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:204) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:227) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638) ~[?:?]
	at org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:231) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	... 111 more
2024-06-17 06:03:21 CST Executor ERROR - Exception in task 1580.1 in stage 48.0 (TID 59486)
org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7 : null
	at org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:253) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
	...
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
```

### Does this PR introduce _any_ user-facing change?

Yes, the killing task on spilling won't report a false `SparkOutOfMemoryError`, so that the killed task status is KILLED instead of FAILED.

### How was this patch tested?

Existing tests to ensure the change breaks nothing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47000 from pan3793/SPARK-48642.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
pan3793 authored and dongjoon-hyun committed Jun 17, 2024
1 parent 0864bbe commit 00a96bb
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
package org.apache.spark.memory;

import javax.annotation.concurrent.GuardedBy;
import java.io.InterruptedIOException;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
@@ -244,7 +245,7 @@ private long trySpillAndAcquire(
cList.remove(idx);
return 0;
}
} catch (ClosedByInterruptException e) {
} catch (ClosedByInterruptException | InterruptedIOException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on {}", e,
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));

0 comments on commit 00a96bb

Please sign in to comment.