Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
kecookier committed Dec 10, 2024
1 parent fe2a8e2 commit 7db8935
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget memoryTarg
return memoryTarget;
}

private static boolean isDynamicCapacity() {
SparkEnv env = SparkEnv.get();
return env != null && env.conf() != null && SparkResourceUtil.getTaskSlots(env.conf()) > 1;
}

public static TreeMemoryTarget newConsumer(
TaskMemoryManager tmm,
String name,
Expand All @@ -63,7 +68,7 @@ public static TreeMemoryTarget newConsumer(
if (GlutenConfig.getConf().memoryIsolation()) {
factory = TreeMemoryConsumers.isolated();
} else {
factory = TreeMemoryConsumers.shared(SparkResourceUtil.getTaskSlots(SparkEnv.get().conf()));
factory = TreeMemoryConsumers.shared(isDynamicCapacity());
}

return factory.newConsumer(tmm, name, spiller, virtualChildren);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ public static Factory isolated() {
* This works as a legacy Spark memory consumer which grants as much as possible of memory
* capacity to each task.
*/
public static Factory shared(long taskSlots) {
boolean isDynamicCapacity = taskSlots > 1;
public static Factory shared() {
return shared(false);
}

public static Factory shared(boolean isDynamicCapacity) {
return createOrGetFactory(TreeMemoryTarget.CAPACITY_UNLIMITED, isDynamicCapacity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testIsolated() {
public void testShared() {
test(
() -> {
final TreeMemoryConsumers.Factory factory = TreeMemoryConsumers.shared(1);
final TreeMemoryConsumers.Factory factory = TreeMemoryConsumers.shared();
final TreeMemoryTarget consumer =
factory.newConsumer(
TaskContext.get().taskMemoryManager(),
Expand All @@ -86,7 +86,7 @@ public void testIsolatedAndShared() {
test(
() -> {
final TreeMemoryTarget shared =
TreeMemoryConsumers.shared(1)
TreeMemoryConsumers.shared()
.newConsumer(
TaskContext.get().taskMemoryManager(),
"FOO",
Expand All @@ -110,7 +110,7 @@ public void testSpill() {
() -> {
final Spillers.AppendableSpillerList spillers = Spillers.appendable();
final TreeMemoryTarget shared =
TreeMemoryConsumers.shared(1)
TreeMemoryConsumers.shared()
.newConsumer(
TaskContext.get().taskMemoryManager(),
"FOO",
Expand Down Expand Up @@ -148,7 +148,7 @@ public void testOverSpill() {
() -> {
final Spillers.AppendableSpillerList spillers = Spillers.appendable();
final TreeMemoryTarget shared =
TreeMemoryConsumers.shared(1)
TreeMemoryConsumers.shared()
.newConsumer(
TaskContext.get().taskMemoryManager(),
"FOO",
Expand Down

0 comments on commit 7db8935

Please sign in to comment.