Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryCompiler Batch Formula Compilation #5070

Merged
merged 21 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public interface QueryCompiler {

/**
* Compile a class.
* Compile a class and wait for completion.
*
* @param request The compilation request
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.FileUtils;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.configuration.DataDir;
Expand All @@ -16,6 +18,7 @@
import io.deephaven.engine.table.impl.util.JobScheduler;
import io.deephaven.engine.table.impl.util.OperationInitializerJobScheduler;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.ByteUtils;
import io.deephaven.util.CompletionStageFuture;
Expand All @@ -39,6 +42,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand All @@ -47,7 +51,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class QueryCompilerImpl implements QueryCompiler {
public class QueryCompilerImpl implements QueryCompiler, LogOutputAppendable {

private static final Logger log = LoggerFactory.getLogger(QueryCompilerImpl.class);
/**
Expand All @@ -71,6 +75,41 @@ public class QueryCompilerImpl implements QueryCompiler {

private static boolean logEnabled = Configuration.getInstance().getBoolean("QueryCompiler.logEnabledDefault");

private static JavaCompiler compiler;
private static final AtomicReference<JavaFileManager> fileManagerCache = new AtomicReference<>();

private static void ensureJavaCompiler() {
synchronized (QueryCompilerImpl.class) {
if (compiler == null) {
compiler = ToolProvider.getSystemJavaCompiler();
if (compiler == null) {
throw new UncheckedDeephavenException(
"No Java compiler provided - are you using a JRE instead of a JDK?");
}
}
}
}

private static JavaFileManager acquireFileManager() {
JavaFileManager fileManager = fileManagerCache.getAndSet(null);
if (fileManager == null) {
fileManager = new SynchronizedJavaFileManager(compiler.getStandardFileManager(null, null, null));
}
return fileManager;
}

private static void releaseFileManager(@NotNull final JavaFileManager fileManager) {
// Reusing the file manager saves a lot of the time in the compilation process. However, we need to be careful
// to avoid keeping too many file handles open so we'll limit ourselves to just one outstanding file manager.
if (!fileManagerCache.compareAndSet(null, fileManager)) {
try {
fileManager.close();
} catch (final IOException err) {
throw new UncheckedIOException("Could not close JavaFileManager", err);
}
}
}

public static final String FORMULA_CLASS_PREFIX = "io.deephaven.temp";
public static final String DYNAMIC_CLASS_PREFIX = "io.deephaven.dynamic";

Expand All @@ -81,7 +120,7 @@ public static QueryCompilerImpl create(File cacheDirectory, ClassLoader classLoa
static QueryCompilerImpl createForUnitTests() {
final Path queryCompilerDir = DataDir.get()
.resolve("io.deephaven.engine.context.QueryCompiler.createForUnitTests");
return new QueryCompilerImpl(queryCompilerDir.toFile());
return new QueryCompilerImpl(queryCompilerDir.toFile(), QueryCompilerImpl.class.getClassLoader(), false);
}

private final Map<String, CompletionStageFuture<Class<?>>> knownClasses = new HashMap<>();
Expand All @@ -92,17 +131,12 @@ static QueryCompilerImpl createForUnitTests() {
private final Set<File> additionalClassLocations;
private final WritableURLClassLoader ucl;

private QueryCompilerImpl(File classDestination) {
this(classDestination, null, false);
}

private QueryCompilerImpl(
final File classDestination,
final ClassLoader parentClassLoader,
boolean isCacheDirectory) {
final ClassLoader parentClassLoaderToUse = parentClassLoader == null
? QueryCompilerImpl.class.getClassLoader()
: parentClassLoader;
@NotNull final File classDestination,
@NotNull final ClassLoader parentClassLoader,
boolean classDestinationIsAlsoClassSource) {
ensureJavaCompiler();

this.classDestination = classDestination;
ensureDirectories(this.classDestination, () -> "Failed to create missing class destination directory " +
classDestination.getAbsolutePath());
Expand All @@ -114,13 +148,24 @@ private QueryCompilerImpl(
} catch (MalformedURLException e) {
throw new UncheckedDeephavenException(e);
}
this.ucl = new WritableURLClassLoader(urls, parentClassLoaderToUse);
this.ucl = new WritableURLClassLoader(urls, parentClassLoader);

if (isCacheDirectory) {
if (classDestinationIsAlsoClassSource) {
addClassSource(classDestination);
}
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("QueryCompiler{classDestination=").append(classDestination.getAbsolutePath())
.append("}");
}

@Override
public String toString() {
return new LogOutputStringImpl().append(this).toString();
}

/**
* Enables or disables compilation logging.
*
Expand Down Expand Up @@ -192,12 +237,7 @@ public static void writeClass(final File destinationDirectory, final String clas
fileOutStream.close();
}

/**
* Compiles all requests.
*
* @param requests The compilation requests; these must be independent of each other
* @param resolvers The resolvers to use for delivering compilation results
*/
@Override
public void compile(
@NotNull final QueryCompilerRequest[] requests,
@NotNull final CompletionStageFuture.Resolver<Class<?>>[] resolvers) {
Expand Down Expand Up @@ -616,6 +656,14 @@ private static String loadIdentifyingField(Class<?> c) {
}
}

private static String makeFinalCode(String className, String classBody, String packageName) {
final String joinedEscapedBody = createEscapedJoinedString(classBody);
classBody = classBody.replaceAll("\\$CLASSNAME\\$", className);
classBody = classBody.substring(0, classBody.lastIndexOf("}"));
classBody += " public static String " + IDENTIFYING_FIELD_NAME + " = " + joinedEscapedBody + ";\n}";
return "package " + packageName + ";\n" + classBody;
}

/**
* Transform a string into the corresponding Java source code that compiles into that string. This involves escaping
* special characters, surrounding it with quotes, and (if the string is larger than the max string length for Java
Expand Down Expand Up @@ -678,14 +726,6 @@ private static int calcBytesConsumed(final char ch) {
return 3;
}

private static String makeFinalCode(String className, String classBody, String packageName) {
final String joinedEscapedBody = createEscapedJoinedString(classBody);
classBody = classBody.replaceAll("\\$CLASSNAME\\$", className);
classBody = classBody.substring(0, classBody.lastIndexOf("}"));
classBody += " public static String " + IDENTIFYING_FIELD_NAME + " = " + joinedEscapedBody + ";\n}";
return "package " + packageName + ";\n" + classBody;
}

private static class JavaSourceFromString extends SimpleJavaFileObject {
final String description;
final String code;
Expand Down Expand Up @@ -759,16 +799,6 @@ public JavaSourceFromString makeSource() {
}
}

private static final JavaCompiler compiler;
private static final JavaFileManager fileManager;
static {
compiler = ToolProvider.getSystemJavaCompiler();
if (compiler == null) {
throw new UncheckedDeephavenException("No Java compiler provided - are you using a JRE instead of a JDK?");
}
fileManager = new SynchronizedJavaFileManager(compiler.getStandardFileManager(null, null, null));
}

private void maybeCreateClasses(
@NotNull final List<CompilationRequestAttempt> requests) {
// Get the destination root directory (e.g. /tmp/workspace/cache/classes) and populate it with the package
Expand Down Expand Up @@ -816,16 +846,30 @@ private void maybeCreateClasses(
jobScheduler = new OperationInitializerJobScheduler();
}

final AtomicBoolean cleanupAlreadyRun = new AtomicBoolean();
final JavaFileManager fileManager = acquireFileManager();
final AtomicReference<RuntimeException> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
final Runnable cleanup = () -> {
try {
FileUtils.deleteRecursively(new File(tempDirAsString));
} catch (Exception e) {
// ignore errors here
if (!cleanupAlreadyRun.compareAndSet(false, true)) {
// onError could be run after cleanup if cleanup throws an exception
return;
}

latch.countDown();
try {
try {
FileUtils.deleteRecursively(new File(tempDirAsString));
} catch (Exception e) {
// ignore errors here
}
try {
releaseFileManager(fileManager);
} catch (Exception e) {
// ignore errors here
}
} finally {
latch.countDown();
}
};

final Consumer<Exception> onError = err -> {
Expand All @@ -841,7 +885,8 @@ private void maybeCreateClasses(
0, numTasks, (context, jobId, nestedErrorConsumer) -> {
final int startInclusive = jobId * requestsPerTask;
final int endExclusive = Math.min(requests.size(), (jobId + 1) * requestsPerTask);
doCreateClasses(requests, rootPathAsString, tempDirAsString, startInclusive, endExclusive);
doCreateClasses(
fileManager, requests, rootPathAsString, tempDirAsString, startInclusive, endExclusive);
}, cleanup, onError);

try {
Expand All @@ -860,6 +905,7 @@ private void maybeCreateClasses(
}

private void doCreateClasses(
@NotNull final JavaFileManager fileManager,
@NotNull final List<CompilationRequestAttempt> requests,
@NotNull final String rootPathAsString,
@NotNull final String tempDirAsString,
Expand All @@ -869,21 +915,23 @@ private void doCreateClasses(
// If any of our requests fail to compile then the JavaCompiler will not write any class files at all. The
// non-failing requests will be retried in a second pass that is expected to succeed. This enables us to
// fulfill futures independent of each other; otherwise a single failure would taint all requests in a batch.
final boolean wantRetry = doCreateClassesSingleRound(requests, rootPathAsString, tempDirAsString,
final boolean wantRetry = doCreateClassesSingleRound(fileManager, requests, rootPathAsString, tempDirAsString,
startInclusive, endExclusive, toRetry);
if (!wantRetry) {
return;
}

final List<CompilationRequestAttempt> ignored = new ArrayList<>();
if (doCreateClassesSingleRound(toRetry, rootPathAsString, tempDirAsString, 0, toRetry.size(), ignored)) {
if (doCreateClassesSingleRound(fileManager, toRetry, rootPathAsString, tempDirAsString, 0, toRetry.size(),
ignored)) {
// We only retried compilation units that did not fail on the first pass, so we should not have any failures
// on the second pass.
throw new IllegalStateException("Unexpected failure during second pass of compilation");
}
}

private boolean doCreateClassesSingleRound(
@NotNull final JavaFileManager fileManager,
@NotNull final List<CompilationRequestAttempt> requests,
@NotNull final String rootPathAsString,
@NotNull final String tempDirAsString,
Expand Down Expand Up @@ -996,8 +1044,8 @@ private static String getJavaClassPath() {
}

// IntelliJ will bundle a very large class path into an empty jar with a Manifest that will define the full
// class path
// Look for this being used during compile time, so the full class path can be sent into the compile call
// class path. Look for this being used during compile time, so the full class path can be sent into the compile
// call.
final String intellijClassPathJarRegex = ".*classpath[0-9]*\\.jar.*";
if (javaClasspath.matches(intellijClassPathJarRegex)) {
try {
Expand All @@ -1014,8 +1062,7 @@ private static String getJavaClassPath() {
final String extendedClassPath = (String) attributes.get(classPathAttribute);
if (extendedClassPath != null) {
// Parses the files in the manifest description an changes their format to drop the "file:/"
// and
// use the default path separator
// and use the default path separator
final String filePaths = Stream.of(extendedClassPath.split("file:/"))
.map(String::trim)
.filter(fileName -> !fileName.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ public Table selectDistinctInternal(Collection<? extends Selectable> columns) {
return null;
}

final Set<String> newColumns = new HashSet<>();
final Set<String> partitioningDerivedColumnNames = new HashSet<>();
for (final SelectColumn selectColumn : selectColumns) {
if (!((PartitionAwareSourceTable) table).isValidAgainstColumnPartitionTable(
selectColumn.getColumns(), selectColumn.getColumnArrays(), newColumns)) {
selectColumn.getColumns(), selectColumn.getColumnArrays(), partitioningDerivedColumnNames)) {
return null;
}
newColumns.add(selectColumn.getName());
partitioningDerivedColumnNames.add(selectColumn.getName());
}
return table.selectDistinct(selectColumns);
}
Expand Down Expand Up @@ -290,15 +290,15 @@ public final Table selectDistinct(@NotNull final Collection<? extends Selectable
SelectAndViewAnalyzer.initializeSelectColumns(
definition.getColumnNameMap(), selectColumns.toArray(SelectColumn[]::new));

final Set<String> newColumns = new HashSet<>();
final Set<String> partitioningDerivedColumnNames = new HashSet<>();
for (final SelectColumn selectColumn : selectColumns) {
if (!isValidAgainstColumnPartitionTable(
selectColumn.getColumns(), selectColumn.getColumnArrays(), newColumns)) {
selectColumn.getColumns(), selectColumn.getColumnArrays(), partitioningDerivedColumnNames)) {
// Be sure to invoke the super-class version of this method, rather than the array-based one that
// delegates to this method.
return super.selectDistinct(selectColumns);
}
newColumns.add(selectColumn.getName());
partitioningDerivedColumnNames.add(selectColumn.getName());
}

// Ensure that the location table is available and populated with non-null, non-empty locations.
Expand All @@ -317,11 +317,12 @@ private boolean isValidAgainstColumnPartitionTable(
private boolean isValidAgainstColumnPartitionTable(
@NotNull final Collection<String> columnNames,
@NotNull final Collection<String> columnArrayNames,
@NotNull final Collection<String> newColumns) {
@NotNull final Collection<String> partitioningDerivedColumnNames) {
if (!columnArrayNames.isEmpty()) {
return false;
}
return columnNames.stream().allMatch(
columnName -> partitioningColumnDefinitions.containsKey(columnName) || newColumns.contains(columnName));
columnName -> partitioningColumnDefinitions.containsKey(columnName)
|| partitioningDerivedColumnNames.contains(columnName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.deephaven.util.CompletionStageFuture;
import io.deephaven.util.datastructures.CachingSupplier;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -43,6 +44,7 @@ static QueryCompilerRequestProcessor.BatchProcessor batch() {
/**
* @return a CachingSupplier that supplies a snapshot of the current query scope variables
*/
@VisibleForTesting
static CachingSupplier<Map<String, Object>> newQueryScopeVariableSupplier() {
final QueryScope queryScope = ExecutionContext.getContext().getQueryScope();
return new CachingSupplier<>(() -> Collections.unmodifiableMap(
Expand Down
Loading
Loading