Skip to content

Commit

Permalink
Synchronize JFM Stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Jan 29, 2024
1 parent b30921c commit 351a281
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.configuration.Configuration;
import io.deephaven.configuration.DataDir;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.util.SynchronizedJavaFileManager;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.ByteUtils;
Expand Down Expand Up @@ -46,7 +47,8 @@

public class QueryCompiler {
/** A flag to externally disable parallel compilation. */
public static boolean DISABLE_PARALLEL_COMPILE = false;
public static volatile boolean DISABLE_PARALLEL_COMPILE = false;
public static volatile boolean DISABLE_SHARED_COMPILER = false;

private static final Logger log = LoggerFactory.getLogger(QueryCompiler.class);
/**
Expand Down Expand Up @@ -535,24 +537,15 @@ private void compileHelper(
for (int ii = 0, jj = 0; ii < requests.size(); ++ii) {
if (!compiled[ii]) {
helperRequests[jj++] = new CreateClassHelperRequest(
requests.get(ii).className, requests.get(ii).classBody, packageName[ii], fqClassName[ii]);
ii,
requests.get(ii).className,
requests.get(ii).classBody,
packageName[ii],
fqClassName[ii]);
}
}

long startTm = System.nanoTime();
int parallelismFactor = ForkJoinPool.getCommonPoolParallelism();
int requestsPerTask = Math.max(32, (helperRequests.length + parallelismFactor - 1) / parallelismFactor);
if (DISABLE_PARALLEL_COMPILE || parallelismFactor == 1 || requestsPerTask >= helperRequests.length) {
maybeCreateClass(helperRequests, 0, helperRequests.length);
} else {
int numTasks = (helperRequests.length + requestsPerTask - 1) / requestsPerTask;
IntStream.range(0, numTasks).parallel().forEach(jobId -> {
maybeCreateClass(helperRequests,
jobId * requestsPerTask,
Math.min(helperRequests.length, (jobId + 1) * requestsPerTask));
});
}
log.error().append("Compiled in ").append(Double.toString((System.nanoTime() - startTm)/1e9)).append("s.").endl();
maybeCreateClass(helperRequests);

// We could be running on a screwy filesystem that is slow (e.g. NFS). If we wrote a file and can't load it
// ... then give the filesystem some time. All requests should use the same deadline.
Expand Down Expand Up @@ -758,16 +751,18 @@ public CharSequence getCharContent(boolean ignoreEncodingErrors) {
}

private static class CreateClassHelperRequest {
final int requestIndex;
final String fqClassName;
final String finalCode;
final String[] splitPackageName;

// String className, String code, String packageName, String fqClassName
private CreateClassHelperRequest(
final int requestIndex,
@NotNull final String className,
@NotNull final String code,
@NotNull final String packageName,
@NotNull final String fqClassName) {
this.requestIndex = requestIndex;
this.fqClassName = fqClassName;

finalCode = makeFinalCode(className, code, packageName);
Expand Down Expand Up @@ -796,9 +791,7 @@ public JavaSourceFromString makeSource() {
}

private void maybeCreateClass(
@NotNull final CreateClassHelperRequest[] requests,
final int startInclusive,
final int endExclusive) {
@NotNull final CreateClassHelperRequest[] requests) {
// Get the destination root directory (e.g. /tmp/workspace/cache/classes) and populate it with the package
// directories (e.g. io/deephaven/test) if they are not already there. This will be useful later.
// Also create a temp directory e.g. /tmp/workspace/cache/classes/temporaryCompilationDirectory12345
Expand All @@ -823,57 +816,110 @@ private void maybeCreateClass(
throw new UncheckedIOException(ioe);
}

final JavaCompiler compiler;
final JavaFileManager fileManager;

if (!DISABLE_SHARED_COMPILER) {
compiler = ToolProvider.getSystemJavaCompiler();
if (compiler == null) {
throw new UncheckedDeephavenException("No Java compiler provided - are you using a JRE instead of a JDK?");
}

fileManager = new SynchronizedJavaFileManager(
compiler.getStandardFileManager(null, null, null));
} else {
compiler = null;
fileManager = null;
}

boolean exceptionCaught = false;
try {
maybeCreateClassHelper(requests, rootPathAsString, tempDirAsString, startInclusive, endExclusive);
long startTm = System.nanoTime();
int parallelismFactor = ForkJoinPool.getCommonPoolParallelism();
log.warn().append("Compiling with parallelism factor of: ").append(parallelismFactor).endl();
int requestsPerTask = Math.max(32, (requests.length + parallelismFactor - 1) / parallelismFactor);
if (DISABLE_PARALLEL_COMPILE || parallelismFactor == 1 || requestsPerTask >= requests.length) {
maybeCreateClassHelper(compiler, fileManager, requests, rootPathAsString, tempDirAsString,
0, requests.length);
} else {
int numTasks = (requests.length + requestsPerTask - 1) / requestsPerTask;
IntStream.range(0, numTasks).parallel().forEach(jobId -> {
final int startInclusive = jobId * requestsPerTask;
final int endExclusive = Math.min(requests.length, (jobId + 1) * requestsPerTask);
maybeCreateClassHelper(compiler, fileManager, requests, rootPathAsString, tempDirAsString,
startInclusive, endExclusive);
});
}
log.error().append("Compiled in ").append(Double.toString((System.nanoTime() - startTm) / 1e9)).append("s.").endl();
} catch (final Throwable t) {
exceptionCaught = true;
throw t;
} finally {
try {
FileUtils.deleteRecursively(new File(tempDirAsString));
} catch (Exception e) {
// ignore errors here
}

if (fileManager != null) {
try {
fileManager.close();
} catch (IOException ioe) {
if (!exceptionCaught) {
// noinspection ThrowFromFinallyBlock
throw new UncheckedIOException("Could not close JavaFileManager", ioe);
}
}
}
}
}

private void maybeCreateClassHelper(
JavaCompiler compiler,
JavaFileManager fileManager,
@NotNull final CreateClassHelperRequest[] requests,
@NotNull final String rootPathAsString,
@NotNull final String tempDirAsString,
final int startInclusive,
final int endExclusive) {
final StringWriter compilerOutput = new StringWriter();

final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
if (compiler == null) {
throw new UncheckedDeephavenException("No Java compiler provided - are you using a JRE instead of a JDK?");
if (DISABLE_SHARED_COMPILER) {
compiler = ToolProvider.getSystemJavaCompiler();
if (compiler == null) {
throw new UncheckedDeephavenException("No Java compiler provided - are you using a JRE instead of a JDK?");
}

fileManager = compiler.getStandardFileManager(null, null, null);
}

final String classPathAsString = getClassPath() + File.pathSeparator + getJavaClassPath();
final List<String> compilerOptions = Arrays.asList("-d", tempDirAsString, "-cp", classPathAsString);

final JavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);

boolean result;
boolean exceptionThrown = false;
try {
result = compiler.getTask(compilerOutput,
fileManager,
null,
compilerOptions,
null,
Arrays.stream(requests, startInclusive, endExclusive)
.map(CreateClassHelperRequest::makeSource)
.collect(Collectors.toList()))
fileManager,
diagnostic -> log.error().append("Reporting Error: ").append(diagnostic.toString()).endl(),
compilerOptions,
null,
Arrays.stream(requests, startInclusive, endExclusive)
.map(CreateClassHelperRequest::makeSource)
.collect(Collectors.toList()))
.call();
} catch (final Throwable err) {
} catch (final Throwable t) {
exceptionThrown = true;
throw err;
throw t;
} finally {
try {
fileManager.close();
} catch (final IOException ioe) {
if (!exceptionThrown) {
// noinspection ThrowFromFinallyBlock
throw new UncheckedIOException("Could not close JavaFileManager", ioe);
if (DISABLE_SHARED_COMPILER) {
try {
fileManager.close();
} catch (IOException ioe) {
if (!exceptionThrown) {
// noinspection ThrowFromFinallyBlock
throw new UncheckedIOException("Could not close JavaFileManager", ioe);
}
}
}
}
Expand All @@ -887,8 +933,8 @@ private void maybeCreateClassHelper(
// We want to atomically move it to e.g.
// /tmp/workspace/cache/classes/io/deephaven/test/cm12862183232603186v52_0/{various class files}
Arrays.stream(requests, startInclusive, endExclusive).forEach(request -> {
Path srcDir = Paths.get(tempDirAsString, request.splitPackageName);
Path destDir = Paths.get(rootPathAsString, request.splitPackageName);
final Path srcDir = Paths.get(tempDirAsString, request.splitPackageName);
final Path destDir = Paths.get(rootPathAsString, request.splitPackageName);
try {
Files.move(srcDir, destDir, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package io.deephaven.engine.context.util;

import javax.tools.FileObject;
import javax.tools.JavaFileManager;
import javax.tools.JavaFileObject;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.Set;

public class SynchronizedJavaFileManager implements JavaFileManager {

private final JavaFileManager delegate;

public SynchronizedJavaFileManager(JavaFileManager delegate) {
this.delegate = delegate;
}

@Override
public synchronized ClassLoader getClassLoader(Location location) {
return delegate.getClassLoader(location);
}

@Override
public synchronized Iterable<JavaFileObject> list(
Location location,
String packageName,
Set<JavaFileObject.Kind> kinds,
boolean recurse) throws IOException {
return delegate.list(location, packageName, kinds, recurse);
}

@Override
public synchronized String inferBinaryName(Location location, JavaFileObject file) {
return delegate.inferBinaryName(location, file);
}

@Override
public synchronized boolean isSameFile(FileObject a, FileObject b) {
return delegate.isSameFile(a, b);
}

@Override
public synchronized boolean handleOption(String current, Iterator<String> remaining) {
return delegate.handleOption(current, remaining);
}

@Override
public synchronized boolean hasLocation(Location location) {
return delegate.hasLocation(location);
}

@Override
public synchronized JavaFileObject getJavaFileForInput(
Location location,
String className,
JavaFileObject.Kind kind) throws IOException {
return delegate.getJavaFileForInput(location, className, kind);
}

@Override
public synchronized JavaFileObject getJavaFileForOutput(
Location location,
String className,
JavaFileObject.Kind kind,
FileObject sibling) throws IOException {
return delegate.getJavaFileForOutput(location, className, kind, sibling);
}

@Override
public synchronized FileObject getFileForInput(
Location location,
String packageName,
String relativeName) throws IOException {
return delegate.getFileForInput(location, packageName, relativeName);
}

@Override
public synchronized FileObject getFileForOutput(
Location location,
String packageName,
String relativeName,
FileObject sibling) throws IOException {
return delegate.getFileForOutput(location, packageName, relativeName, sibling);
}

@Override
public synchronized void flush() throws IOException {
delegate.flush();
}

@Override
public synchronized void close() throws IOException {
delegate.close();
}

@Override
public synchronized int isSupportedOption(String option) {
return delegate.isSupportedOption(option);
}

@Override
public synchronized Location getLocationForModule(Location location, String moduleName) throws IOException {
return delegate.getLocationForModule(location, moduleName);
}

@Override
public synchronized Location getLocationForModule(Location location, JavaFileObject fo) throws IOException {
return delegate.getLocationForModule(location, fo);
}

@Override
public synchronized <S> ServiceLoader<S> getServiceLoader(Location location, Class<S> service) throws IOException {
return delegate.getServiceLoader(location, service);
}

@Override
public synchronized String inferModuleName(Location location) throws IOException {
return delegate.inferModuleName(location);
}

@Override
public synchronized Iterable<Set<Location>> listLocationsForModules(Location location) throws IOException {
return delegate.listLocationsForModules(location);
}

@Override
public synchronized boolean contains(Location location, FileObject fo) throws IOException {
return delegate.contains(location, fo);
}
}

0 comments on commit 351a281

Please sign in to comment.