Skip to content

Commit

Permalink
Add property for eager building of plan
Browse files Browse the repository at this point in the history
As part of new native plan checker SPI, the new property
eager-plan-validation-enabled will enable the eager building and
validation of a logical plan so that any errors or incompatibilities
in the plan will cause the query to fail quickly, before queueing
and cluster resources are assigned to keep queries with invalid
plans from holding slots in the queue.

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0008-plan-checker.md
  • Loading branch information
BryanCutler committed Oct 9, 2024
1 parent 83cb0f6 commit 5e0f3fe
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 5 deletions.
11 changes: 11 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ Number of local parallel table writer threads per worker for partitioned writes.
set, the number set by ``task_writer_count`` will be used. It is required to be a power
of two for a Java query engine.

``eager-plan-validation-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``

This property enables the eager building and validation of a logical plan.
When enabled, the logical plan will begin to be built and validated before
queueing and allocation of cluster resources so that any errors or
incompatibilities in the query plan will fail quickly and inform the user.

.. _tuning-memory:

Memory Management Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public final class SystemSessionProperties
public static final String REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION = "rewrite_expression_with_constant_expression";
public static final String PRINT_ESTIMATED_STATS_FROM_CACHE = "print_estimated_stats_from_cache";
public static final String REMOVE_CROSS_JOIN_WITH_CONSTANT_SINGLE_ROW_INPUT = "remove_cross_join_with_constant_single_row_input";
public static final String EAGER_PLAN_VALIDATION_ENABLED = "eager_plan_validation_enabled";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "native_simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -2018,6 +2019,11 @@ public SystemSessionProperties(
"If one input of the cross join is a single row with constant value, remove this cross join and replace with a project node",
featuresConfig.isRemoveCrossJoinWithSingleConstantRow(),
false),
booleanProperty(
EAGER_PLAN_VALIDATION_ENABLED,
"Enable eager building and validation of logical plan before queueing",
featuresConfig.isEagerPlanValidationEnabled(),
false),
new PropertyMetadata<>(
DEFAULT_VIEW_SECURITY_MODE,
format("Set default view security mode. Options are: %s",
Expand Down Expand Up @@ -3344,6 +3350,11 @@ public static boolean isRewriteExpressionWithConstantEnabled(Session session)
return session.getSystemProperty(REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION, Boolean.class);
}

public static boolean isEagerPlanValidationEnabled(Session session)
{
return session.getSystemProperty(EAGER_PLAN_VALIDATION_ENABLED, Boolean.class);
}

public static CreateView.Security getDefaultViewSecurityMode(Session session)
{
return session.getSystemProperty(DEFAULT_VIEW_SECURITY_MODE, CreateView.Security.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.inject.Inject;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public class EagerPlanValidationExecutionMBean
{
private final ThreadPoolExecutorMBean executorMBean;

@Inject
public EagerPlanValidationExecutionMBean(@ForEagerPlanValidation ExecutorService executor)
{
this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executor);
}

@Managed
@Nested
public ThreadPoolExecutorMBean getExecutor()
{
return executorMBean;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForEagerPlanValidation
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static com.facebook.presto.SystemSessionProperties.getExecutionPolicy;
import static com.facebook.presto.SystemSessionProperties.getQueryAnalyzerTimeout;
import static com.facebook.presto.SystemSessionProperties.isEagerPlanValidationEnabled;
import static com.facebook.presto.SystemSessionProperties.isLogInvokedFunctionNamesEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpoolingOutputBufferEnabled;
import static com.facebook.presto.common.RuntimeMetricName.FRAGMENT_PLAN_TIME_NANOS;
Expand Down Expand Up @@ -141,6 +144,8 @@ public class SqlQueryExecution
private final PlanCanonicalInfoProvider planCanonicalInfoProvider;
private final QueryAnalysis queryAnalysis;
private final AnalyzerContext analyzerContext;
private final CompletableFuture<PlanRoot> planFuture;
private final AtomicBoolean planFutureLocked = new AtomicBoolean();

private SqlQueryExecution(
QueryAnalyzer queryAnalyzer,
Expand All @@ -159,6 +164,7 @@ private SqlQueryExecution(
ExecutorService queryExecutor,
ScheduledExecutorService timeoutThreadExecutor,
SectionExecutionFactory sectionExecutionFactory,
ExecutorService eagerPlanValidationExecutor,
InternalNodeManager internalNodeManager,
ExecutionPolicy executionPolicy,
SplitSchedulerStats schedulerStats,
Expand Down Expand Up @@ -243,6 +249,10 @@ private SqlQueryExecution(
}
}
}

// Optionally build and validate plan immediately, before execution begins
planFuture = isEagerPlanValidationEnabled(getSession()) ?
CompletableFuture.supplyAsync(this::runCreateLogicalPlanAsync, eagerPlanValidationExecutor) : null;
}
}

Expand Down Expand Up @@ -460,8 +470,13 @@ public void start()
Thread.currentThread(),
timeoutThreadExecutor,
getQueryAnalyzerTimeout(getSession()))) {
// create logical plan for the query
plan = createLogicalPlanAndOptimize();
// If planFuture has not started, cancel and build plan in current thread
if (planFuture != null && !planFutureLocked.compareAndSet(false, true)) {
plan = planFuture.get();
}
else {
plan = createLogicalPlanAndOptimize();
}
}

metadata.beginQuery(getSession(), plan.getConnectors());
Expand Down Expand Up @@ -590,6 +605,21 @@ private PlanRoot createLogicalPlanAndOptimize()
}
}

private PlanRoot runCreateLogicalPlanAsync()
{
try {
// Check if creating plan async has been cancelled
if (planFutureLocked.compareAndSet(false, true)) {
return createLogicalPlanAndOptimize();
}
return null;
}
catch (Throwable e) {
fail(e);
throw e;
}
}

private void planDistribution(PlanRoot plan)
{
CloseableSplitSourceProvider splitSourceProvider = new CloseableSplitSourceProvider(splitManager::getSplits);
Expand Down Expand Up @@ -862,6 +892,7 @@ public static class SqlQueryExecutionFactory
private final ScheduledExecutorService timeoutThreadExecutor;
private final ExecutorService queryExecutor;
private final SectionExecutionFactory sectionExecutionFactory;
private final ExecutorService eagerPlanValidationExecutor;
private final InternalNodeManager internalNodeManager;
private final Map<String, ExecutionPolicy> executionPolicies;
private final StatsCalculator statsCalculator;
Expand All @@ -883,6 +914,7 @@ public static class SqlQueryExecutionFactory
@ForQueryExecution ExecutorService queryExecutor,
@ForTimeoutThread ScheduledExecutorService timeoutThreadExecutor,
SectionExecutionFactory sectionExecutionFactory,
@ForEagerPlanValidation ExecutorService eagerPlanValidationExecutor,
InternalNodeManager internalNodeManager,
Map<String, ExecutionPolicy> executionPolicies,
SplitSchedulerStats schedulerStats,
Expand All @@ -904,6 +936,7 @@ public static class SqlQueryExecutionFactory
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
this.timeoutThreadExecutor = requireNonNull(timeoutThreadExecutor, "timeoutThreadExecutor is null");
this.sectionExecutionFactory = requireNonNull(sectionExecutionFactory, "sectionExecutionFactory is null");
this.eagerPlanValidationExecutor = requireNonNull(eagerPlanValidationExecutor, "eagerPlanValidationExecutor is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.executionPolicies = requireNonNull(executionPolicies, "schedulerPolicies is null");
this.planOptimizers = planOptimizers.getPlanningTimeOptimizers();
Expand Down Expand Up @@ -946,6 +979,7 @@ public QueryExecution createQueryExecution(
queryExecutor,
timeoutThreadExecutor,
sectionExecutionFactory,
eagerPlanValidationExecutor,
internalNodeManager,
executionPolicy,
schedulerStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import com.facebook.presto.event.QueryMonitorConfig;
import com.facebook.presto.event.QueryProgressMonitor;
import com.facebook.presto.execution.ClusterSizeMonitor;
import com.facebook.presto.execution.EagerPlanValidationExecutionMBean;
import com.facebook.presto.execution.ExecutionFactoriesManager;
import com.facebook.presto.execution.ExplainAnalyzeContext;
import com.facebook.presto.execution.ForEagerPlanValidation;
import com.facebook.presto.execution.ForQueryExecution;
import com.facebook.presto.execution.ForTimeoutThread;
import com.facebook.presto.execution.NodeResourceStatusConfig;
Expand Down Expand Up @@ -83,6 +85,7 @@
import com.facebook.presto.server.remotetask.RemoteTaskStats;
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.QueryExplainer;
import com.facebook.presto.sql.planner.PlanFragmenter;
import com.facebook.presto.sql.planner.PlanOptimizers;
Expand All @@ -104,8 +107,11 @@

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
Expand Down Expand Up @@ -264,6 +270,8 @@ protected void setup(Binder binder)
.toInstance(newCachedThreadPool(threadsNamed("query-execution-%s")));
binder.bind(QueryExecutionMBean.class).in(Scopes.SINGLETON);
newExporter(binder).export(QueryExecutionMBean.class).as(generatedNameOf(QueryExecution.class));
binder.bind(EagerPlanValidationExecutionMBean.class).in(Scopes.SINGLETON);
newExporter(binder).export(EagerPlanValidationExecutionMBean.class).withGeneratedName();

binder.bind(SplitSchedulerStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(SplitSchedulerStats.class).withGeneratedName();
Expand Down Expand Up @@ -376,6 +384,14 @@ public static ScheduledExecutorService createTimeoutThreadExecutor()
return executor;
}

@Provides
@Singleton
@ForEagerPlanValidation
public static ExecutorService createEagerPlanValidationExecutor(FeaturesConfig featuresConfig)
{
return new ThreadPoolExecutor(0, featuresConfig.getEagerPlanValidationThreadPoolSize(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue(), threadsNamed("plan-validation-%s"));
}

private void bindLowMemoryKiller(String name, Class<? extends LowMemoryKiller> clazz)
{
install(installModuleIf(
Expand All @@ -395,7 +411,8 @@ public ExecutorCleanup(
@ForQueryExecution ExecutorService queryExecutionExecutor,
@ForScheduler ScheduledExecutorService schedulerExecutor,
@ForTransactionManager ExecutorService transactionFinishingExecutor,
@ForTransactionManager ScheduledExecutorService transactionIdleExecutor)
@ForTransactionManager ScheduledExecutorService transactionIdleExecutor,
@ForEagerPlanValidation ExecutorService eagerPlanValidationExecutor)
{
executors = ImmutableList.<ExecutorService>builder()
.add(statementResponseExecutor)
Expand All @@ -404,6 +421,7 @@ public ExecutorCleanup(
.add(schedulerExecutor)
.add(transactionFinishingExecutor)
.add(transactionIdleExecutor)
.add(eagerPlanValidationExecutor)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ public class FeaturesConfig

private boolean isInlineProjectionsOnValuesEnabled;

private boolean eagerPlanValidationEnabled;
private int eagerPlanValidationThreadPoolSize = 20;

public enum PartitioningPrecisionStrategy
{
// Let Presto decide when to repartition
Expand Down Expand Up @@ -2969,4 +2972,30 @@ public FeaturesConfig setInlineProjectionsOnValues(boolean isInlineProjectionsOn
this.isInlineProjectionsOnValuesEnabled = isInlineProjectionsOnValuesEnabled;
return this;
}

@Config("eager-plan-validation-enabled")
@ConfigDescription("Enable eager building and validation of logical plan before queueing")
public FeaturesConfig setEagerPlanValidationEnabled(boolean eagerPlanValidationEnabled)
{
this.eagerPlanValidationEnabled = eagerPlanValidationEnabled;
return this;
}

public boolean isEagerPlanValidationEnabled()
{
return this.eagerPlanValidationEnabled;
}

@Config("eager-plan-validation-thread-pool-size")
@ConfigDescription("Size of thread pool to use when eager plan validation is enabled")
public FeaturesConfig setEagerPlanValidationThreadPoolSize(int eagerPlanValidationThreadPoolSize)
{
this.eagerPlanValidationThreadPoolSize = eagerPlanValidationThreadPoolSize;
return this;
}

public int getEagerPlanValidationThreadPoolSize()
{
return this.eagerPlanValidationThreadPoolSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ public void testDefaults()
.setPrintEstimatedStatsFromCache(false)
.setRemoveCrossJoinWithSingleConstantRow(true)
.setUseHistograms(false)
.setInlineProjectionsOnValues(false));
.setInlineProjectionsOnValues(false)
.setEagerPlanValidationEnabled(false)
.setEagerPlanValidationThreadPoolSize(20));
}

@Test
Expand Down Expand Up @@ -460,6 +462,8 @@ public void testExplicitPropertyMappings()
.put("optimizer.remove-cross-join-with-single-constant-row", "false")
.put("optimizer.use-histograms", "true")
.put("optimizer.inline-projections-on-values", "true")
.put("eager-plan-validation-enabled", "true")
.put("eager-plan-validation-thread-pool-size", "2")
.build();

FeaturesConfig expected = new FeaturesConfig()
Expand Down Expand Up @@ -661,7 +665,9 @@ public void testExplicitPropertyMappings()
.setPrintEstimatedStatsFromCache(true)
.setRemoveCrossJoinWithSingleConstantRow(false)
.setUseHistograms(true)
.setInlineProjectionsOnValues(true);
.setInlineProjectionsOnValues(true)
.setEagerPlanValidationEnabled(true)
.setEagerPlanValidationThreadPoolSize(2);
assertFullMapping(properties, expected);
}

Expand Down

0 comments on commit 5e0f3fe

Please sign in to comment.