Skip to content

Commit

Permalink
Parallelize Compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Jan 29, 2024
1 parent f976231 commit b30921c
Showing 1 changed file with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class QueryCompiler {
/** A flag to externally disable parallel compilation. */
public static boolean DISABLE_PARALLEL_COMPILE = false;

private static final Logger log = LoggerFactory.getLogger(QueryCompiler.class);
/**
* We pick a number just shy of 65536, leaving a little elbow room for good luck.
Expand All @@ -68,8 +73,6 @@ public class QueryCompiler {
public static final String FORMULA_PREFIX = "io.deephaven.temp";
public static final String DYNAMIC_GROOVY_CLASS_PREFIX = "io.deephaven.dynamic";

public static boolean COMPILE_AS_BATCH = true;

public static QueryCompiler create(File cacheDirectory, ClassLoader classLoader) {
return new QueryCompiler(cacheDirectory, classLoader, true);
}
Expand Down Expand Up @@ -535,7 +538,21 @@ private void compileHelper(
requests.get(ii).className, requests.get(ii).classBody, packageName[ii], fqClassName[ii]);
}
}
maybeCreateClass(helperRequests);

long startTm = System.nanoTime();
int parallelismFactor = ForkJoinPool.getCommonPoolParallelism();
int requestsPerTask = Math.max(32, (helperRequests.length + parallelismFactor - 1) / parallelismFactor);
if (DISABLE_PARALLEL_COMPILE || parallelismFactor == 1 || requestsPerTask >= helperRequests.length) {
maybeCreateClass(helperRequests, 0, helperRequests.length);
} else {
int numTasks = (helperRequests.length + requestsPerTask - 1) / requestsPerTask;
IntStream.range(0, numTasks).parallel().forEach(jobId -> {
maybeCreateClass(helperRequests,
jobId * requestsPerTask,
Math.min(helperRequests.length, (jobId + 1) * requestsPerTask));
});
}
log.error().append("Compiled in ").append(Double.toString((System.nanoTime() - startTm)/1e9)).append("s.").endl();

// We could be running on a screwy filesystem that is slow (e.g. NFS). If we wrote a file and can't load it
// ... then give the filesystem some time. All requests should use the same deadline.
Expand All @@ -554,6 +571,7 @@ private void compileHelper(
Class<?> result = tryLoadClassByFqName(fqClassName[ii], request.parameterClasses);
try {
while (result == null && System.currentTimeMillis() < deadline) {
// noinspection BusyWait
Thread.sleep(CODEGEN_LOOP_DELAY_MS);
result = tryLoadClassByFqName(fqClassName[ii], request.parameterClasses);
}
Expand Down Expand Up @@ -777,7 +795,10 @@ public JavaSourceFromString makeSource() {
}
}

private void maybeCreateClass(@NotNull final CreateClassHelperRequest[] requests) {
private void maybeCreateClass(
@NotNull final CreateClassHelperRequest[] requests,
final int startInclusive,
final int endExclusive) {
// Get the destination root directory (e.g. /tmp/workspace/cache/classes) and populate it with the package
// directories (e.g. io/deephaven/test) if they are not already there. This will be useful later.
// Also create a temp directory e.g. /tmp/workspace/cache/classes/temporaryCompilationDirectory12345
Expand All @@ -803,7 +824,7 @@ private void maybeCreateClass(@NotNull final CreateClassHelperRequest[] requests
}

try {
maybeCreateClassHelper(requests, rootPathAsString, tempDirAsString);
maybeCreateClassHelper(requests, rootPathAsString, tempDirAsString, startInclusive, endExclusive);
} finally {
try {
FileUtils.deleteRecursively(new File(tempDirAsString));
Expand All @@ -816,7 +837,9 @@ private void maybeCreateClass(@NotNull final CreateClassHelperRequest[] requests
private void maybeCreateClassHelper(
@NotNull final CreateClassHelperRequest[] requests,
@NotNull final String rootPathAsString,
@NotNull final String tempDirAsString) {
@NotNull final String tempDirAsString,
final int startInclusive,
final int endExclusive) {
final StringWriter compilerOutput = new StringWriter();

final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
Expand All @@ -837,7 +860,9 @@ private void maybeCreateClassHelper(
null,
compilerOptions,
null,
Arrays.stream(requests).map(CreateClassHelperRequest::makeSource).collect(Collectors.toList()))
Arrays.stream(requests, startInclusive, endExclusive)
.map(CreateClassHelperRequest::makeSource)
.collect(Collectors.toList()))
.call();
} catch (final Throwable err) {
exceptionThrown = true;
Expand All @@ -861,7 +886,7 @@ private void maybeCreateClassHelper(
// class files}
// We want to atomically move it to e.g.
// /tmp/workspace/cache/classes/io/deephaven/test/cm12862183232603186v52_0/{various class files}
for (final CreateClassHelperRequest request : requests) {
Arrays.stream(requests, startInclusive, endExclusive).forEach(request -> {
Path srcDir = Paths.get(tempDirAsString, request.splitPackageName);
Path destDir = Paths.get(rootPathAsString, request.splitPackageName);
try {
Expand All @@ -876,7 +901,7 @@ private void maybeCreateClassHelper(
ioe);
}
}
}
});
}

/**
Expand Down

0 comments on commit b30921c

Please sign in to comment.