From 351a281485b9f3857e6aee9fc4c914213164d94c Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Mon, 29 Jan 2024 15:50:35 -0700 Subject: [PATCH] Synchronize JFM Stuff --- .../engine/context/QueryCompiler.java | 132 ++++++++++++------ .../util/SynchronizedJavaFileManager.java | 132 ++++++++++++++++++ 2 files changed, 221 insertions(+), 43 deletions(-) create mode 100644 engine/context/src/main/java/io/deephaven/engine/context/util/SynchronizedJavaFileManager.java diff --git a/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java b/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java index ae1554b4697..777712b12cd 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java @@ -9,6 +9,7 @@ import io.deephaven.configuration.Configuration; import io.deephaven.configuration.DataDir; import io.deephaven.datastructures.util.CollectionUtil; +import io.deephaven.engine.context.util.SynchronizedJavaFileManager; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.util.ByteUtils; @@ -46,7 +47,8 @@ public class QueryCompiler { /** A flag to externally disable parallel compilation. */ - public static boolean DISABLE_PARALLEL_COMPILE = false; + public static volatile boolean DISABLE_PARALLEL_COMPILE = false; + public static volatile boolean DISABLE_SHARED_COMPILER = false; private static final Logger log = LoggerFactory.getLogger(QueryCompiler.class); /** @@ -535,24 +537,15 @@ private void compileHelper( for (int ii = 0, jj = 0; ii < requests.size(); ++ii) { if (!compiled[ii]) { helperRequests[jj++] = new CreateClassHelperRequest( - requests.get(ii).className, requests.get(ii).classBody, packageName[ii], fqClassName[ii]); + ii, + requests.get(ii).className, + requests.get(ii).classBody, + packageName[ii], + fqClassName[ii]); } } - long startTm = System.nanoTime(); - int parallelismFactor = ForkJoinPool.getCommonPoolParallelism(); - int requestsPerTask = Math.max(32, (helperRequests.length + parallelismFactor - 1) / parallelismFactor); - if (DISABLE_PARALLEL_COMPILE || parallelismFactor == 1 || requestsPerTask >= helperRequests.length) { - maybeCreateClass(helperRequests, 0, helperRequests.length); - } else { - int numTasks = (helperRequests.length + requestsPerTask - 1) / requestsPerTask; - IntStream.range(0, numTasks).parallel().forEach(jobId -> { - maybeCreateClass(helperRequests, - jobId * requestsPerTask, - Math.min(helperRequests.length, (jobId + 1) * requestsPerTask)); - }); - } - log.error().append("Compiled in ").append(Double.toString((System.nanoTime() - startTm)/1e9)).append("s.").endl(); + maybeCreateClass(helperRequests); // We could be running on a screwy filesystem that is slow (e.g. NFS). If we wrote a file and can't load it // ... then give the filesystem some time. All requests should use the same deadline. @@ -758,16 +751,18 @@ public CharSequence getCharContent(boolean ignoreEncodingErrors) { } private static class CreateClassHelperRequest { + final int requestIndex; final String fqClassName; final String finalCode; final String[] splitPackageName; - // String className, String code, String packageName, String fqClassName private CreateClassHelperRequest( + final int requestIndex, @NotNull final String className, @NotNull final String code, @NotNull final String packageName, @NotNull final String fqClassName) { + this.requestIndex = requestIndex; this.fqClassName = fqClassName; finalCode = makeFinalCode(className, code, packageName); @@ -796,9 +791,7 @@ public JavaSourceFromString makeSource() { } private void maybeCreateClass( - @NotNull final CreateClassHelperRequest[] requests, - final int startInclusive, - final int endExclusive) { + @NotNull final CreateClassHelperRequest[] requests) { // Get the destination root directory (e.g. /tmp/workspace/cache/classes) and populate it with the package // directories (e.g. io/deephaven/test) if they are not already there. This will be useful later. // Also create a temp directory e.g. /tmp/workspace/cache/classes/temporaryCompilationDirectory12345 @@ -823,18 +816,67 @@ private void maybeCreateClass( throw new UncheckedIOException(ioe); } + final JavaCompiler compiler; + final JavaFileManager fileManager; + + if (!DISABLE_SHARED_COMPILER) { + compiler = ToolProvider.getSystemJavaCompiler(); + if (compiler == null) { + throw new UncheckedDeephavenException("No Java compiler provided - are you using a JRE instead of a JDK?"); + } + + fileManager = new SynchronizedJavaFileManager( + compiler.getStandardFileManager(null, null, null)); + } else { + compiler = null; + fileManager = null; + } + + boolean exceptionCaught = false; try { - maybeCreateClassHelper(requests, rootPathAsString, tempDirAsString, startInclusive, endExclusive); + long startTm = System.nanoTime(); + int parallelismFactor = ForkJoinPool.getCommonPoolParallelism(); + log.warn().append("Compiling with parallelism factor of: ").append(parallelismFactor).endl(); + int requestsPerTask = Math.max(32, (requests.length + parallelismFactor - 1) / parallelismFactor); + if (DISABLE_PARALLEL_COMPILE || parallelismFactor == 1 || requestsPerTask >= requests.length) { + maybeCreateClassHelper(compiler, fileManager, requests, rootPathAsString, tempDirAsString, + 0, requests.length); + } else { + int numTasks = (requests.length + requestsPerTask - 1) / requestsPerTask; + IntStream.range(0, numTasks).parallel().forEach(jobId -> { + final int startInclusive = jobId * requestsPerTask; + final int endExclusive = Math.min(requests.length, (jobId + 1) * requestsPerTask); + maybeCreateClassHelper(compiler, fileManager, requests, rootPathAsString, tempDirAsString, + startInclusive, endExclusive); + }); + } + log.error().append("Compiled in ").append(Double.toString((System.nanoTime() - startTm) / 1e9)).append("s.").endl(); + } catch (final Throwable t) { + exceptionCaught = true; + throw t; } finally { try { FileUtils.deleteRecursively(new File(tempDirAsString)); } catch (Exception e) { // ignore errors here } + + if (fileManager != null) { + try { + fileManager.close(); + } catch (IOException ioe) { + if (!exceptionCaught) { + // noinspection ThrowFromFinallyBlock + throw new UncheckedIOException("Could not close JavaFileManager", ioe); + } + } + } } } private void maybeCreateClassHelper( + JavaCompiler compiler, + JavaFileManager fileManager, @NotNull final CreateClassHelperRequest[] requests, @NotNull final String rootPathAsString, @NotNull final String tempDirAsString, @@ -842,38 +884,42 @@ private void maybeCreateClassHelper( final int endExclusive) { final StringWriter compilerOutput = new StringWriter(); - final JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - if (compiler == null) { - throw new UncheckedDeephavenException("No Java compiler provided - are you using a JRE instead of a JDK?"); + if (DISABLE_SHARED_COMPILER) { + compiler = ToolProvider.getSystemJavaCompiler(); + if (compiler == null) { + throw new UncheckedDeephavenException("No Java compiler provided - are you using a JRE instead of a JDK?"); + } + + fileManager = compiler.getStandardFileManager(null, null, null); } final String classPathAsString = getClassPath() + File.pathSeparator + getJavaClassPath(); final List compilerOptions = Arrays.asList("-d", tempDirAsString, "-cp", classPathAsString); - final JavaFileManager fileManager = compiler.getStandardFileManager(null, null, null); - boolean result; boolean exceptionThrown = false; try { result = compiler.getTask(compilerOutput, - fileManager, - null, - compilerOptions, - null, - Arrays.stream(requests, startInclusive, endExclusive) - .map(CreateClassHelperRequest::makeSource) - .collect(Collectors.toList())) + fileManager, + diagnostic -> log.error().append("Reporting Error: ").append(diagnostic.toString()).endl(), + compilerOptions, + null, + Arrays.stream(requests, startInclusive, endExclusive) + .map(CreateClassHelperRequest::makeSource) + .collect(Collectors.toList())) .call(); - } catch (final Throwable err) { + } catch (final Throwable t) { exceptionThrown = true; - throw err; + throw t; } finally { - try { - fileManager.close(); - } catch (final IOException ioe) { - if (!exceptionThrown) { - // noinspection ThrowFromFinallyBlock - throw new UncheckedIOException("Could not close JavaFileManager", ioe); + if (DISABLE_SHARED_COMPILER) { + try { + fileManager.close(); + } catch (IOException ioe) { + if (!exceptionThrown) { + // noinspection ThrowFromFinallyBlock + throw new UncheckedIOException("Could not close JavaFileManager", ioe); + } } } } @@ -887,8 +933,8 @@ private void maybeCreateClassHelper( // We want to atomically move it to e.g. // /tmp/workspace/cache/classes/io/deephaven/test/cm12862183232603186v52_0/{various class files} Arrays.stream(requests, startInclusive, endExclusive).forEach(request -> { - Path srcDir = Paths.get(tempDirAsString, request.splitPackageName); - Path destDir = Paths.get(rootPathAsString, request.splitPackageName); + final Path srcDir = Paths.get(tempDirAsString, request.splitPackageName); + final Path destDir = Paths.get(rootPathAsString, request.splitPackageName); try { Files.move(srcDir, destDir, StandardCopyOption.ATOMIC_MOVE); } catch (IOException ioe) { diff --git a/engine/context/src/main/java/io/deephaven/engine/context/util/SynchronizedJavaFileManager.java b/engine/context/src/main/java/io/deephaven/engine/context/util/SynchronizedJavaFileManager.java new file mode 100644 index 00000000000..80a4d0c1667 --- /dev/null +++ b/engine/context/src/main/java/io/deephaven/engine/context/util/SynchronizedJavaFileManager.java @@ -0,0 +1,132 @@ +package io.deephaven.engine.context.util; + +import javax.tools.FileObject; +import javax.tools.JavaFileManager; +import javax.tools.JavaFileObject; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.ServiceLoader; +import java.util.Set; + +public class SynchronizedJavaFileManager implements JavaFileManager { + + private final JavaFileManager delegate; + + public SynchronizedJavaFileManager(JavaFileManager delegate) { + this.delegate = delegate; + } + + @Override + public synchronized ClassLoader getClassLoader(Location location) { + return delegate.getClassLoader(location); + } + + @Override + public synchronized Iterable list( + Location location, + String packageName, + Set kinds, + boolean recurse) throws IOException { + return delegate.list(location, packageName, kinds, recurse); + } + + @Override + public synchronized String inferBinaryName(Location location, JavaFileObject file) { + return delegate.inferBinaryName(location, file); + } + + @Override + public synchronized boolean isSameFile(FileObject a, FileObject b) { + return delegate.isSameFile(a, b); + } + + @Override + public synchronized boolean handleOption(String current, Iterator remaining) { + return delegate.handleOption(current, remaining); + } + + @Override + public synchronized boolean hasLocation(Location location) { + return delegate.hasLocation(location); + } + + @Override + public synchronized JavaFileObject getJavaFileForInput( + Location location, + String className, + JavaFileObject.Kind kind) throws IOException { + return delegate.getJavaFileForInput(location, className, kind); + } + + @Override + public synchronized JavaFileObject getJavaFileForOutput( + Location location, + String className, + JavaFileObject.Kind kind, + FileObject sibling) throws IOException { + return delegate.getJavaFileForOutput(location, className, kind, sibling); + } + + @Override + public synchronized FileObject getFileForInput( + Location location, + String packageName, + String relativeName) throws IOException { + return delegate.getFileForInput(location, packageName, relativeName); + } + + @Override + public synchronized FileObject getFileForOutput( + Location location, + String packageName, + String relativeName, + FileObject sibling) throws IOException { + return delegate.getFileForOutput(location, packageName, relativeName, sibling); + } + + @Override + public synchronized void flush() throws IOException { + delegate.flush(); + } + + @Override + public synchronized void close() throws IOException { + delegate.close(); + } + + @Override + public synchronized int isSupportedOption(String option) { + return delegate.isSupportedOption(option); + } + + @Override + public synchronized Location getLocationForModule(Location location, String moduleName) throws IOException { + return delegate.getLocationForModule(location, moduleName); + } + + @Override + public synchronized Location getLocationForModule(Location location, JavaFileObject fo) throws IOException { + return delegate.getLocationForModule(location, fo); + } + + @Override + public synchronized ServiceLoader getServiceLoader(Location location, Class service) throws IOException { + return delegate.getServiceLoader(location, service); + } + + @Override + public synchronized String inferModuleName(Location location) throws IOException { + return delegate.inferModuleName(location); + } + + @Override + public synchronized Iterable> listLocationsForModules(Location location) throws IOException { + return delegate.listLocationsForModules(location); + } + + @Override + public synchronized boolean contains(Location location, FileObject fo) throws IOException { + return delegate.contains(location, fo); + } +}