Skip to content

Commit

Permalink
Add SPI to support plugin for custom plan checkers
Browse files Browse the repository at this point in the history
This adds a new SPI to support plugin custom plan checkers to be
provided for validating intermediate, final, and fragment stages of
a logical plan. The motivation for this is to allow for a native plan
checker that will eagerly validate a plan on the native sidecar to
quickly fail the query if there are incompatibilities.

Add unit test to verify that a queued query can be validated
and fail while queued. This is done by using the new custom
plan checker SPI to add plugin that will trigger a failure when
validating the plan.

See also: prestodb#23596
RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0008-plan-checker.md
  • Loading branch information
BryanCutler authored and Joe-Abraham committed Oct 15, 2024
1 parent 69aeccf commit 0e9b659
Show file tree
Hide file tree
Showing 32 changed files with 578 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ private StreamingPlanSection tryCostBasedOptimize(StreamingPlanSection section)
.forEach(currentSubPlan -> {
Optional<PlanFragment> newPlanFragment = performRuntimeOptimizations(currentSubPlan);
if (newPlanFragment.isPresent()) {
planChecker.validatePlanFragment(newPlanFragment.get().getRoot(), session, metadata, warningCollector);
planChecker.validatePlanFragment(newPlanFragment.get(), session, metadata, warningCollector);
oldToNewFragment.put(currentSubPlan.getFragment(), newPlanFragment.get());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;
import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory;
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
Expand All @@ -48,6 +49,7 @@
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.tracing.TracerProviderManager;
import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager;
Expand Down Expand Up @@ -131,6 +133,7 @@ public class PluginManager
private final AnalyzerProviderManager analyzerProviderManager;
private final QueryPreparerProviderManager queryPreparerProviderManager;
private final NodeStatusNotificationManager nodeStatusNotificationManager;
private final PlanCheckerProviderManager planCheckerProviderManager;

@Inject
public PluginManager(
Expand All @@ -152,7 +155,8 @@ public PluginManager(
ClusterTtlProviderManager clusterTtlProviderManager,
HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager,
TracerProviderManager tracerProviderManager,
NodeStatusNotificationManager nodeStatusNotificationManager)
NodeStatusNotificationManager nodeStatusNotificationManager,
PlanCheckerProviderManager planCheckerProviderManager)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -184,6 +188,7 @@ public PluginManager(
this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");
this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null");
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -348,6 +353,11 @@ public void installPlugin(Plugin plugin)
log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName());
nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory);
}

for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) {
log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName());
planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory);
}
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.storage.TempStorageModule;
import com.facebook.presto.tracing.TracerProviderManager;
Expand Down Expand Up @@ -177,6 +178,7 @@ public void run()
injector.getInstance(TracerProviderManager.class).loadTracerProvider();
injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
injector.getInstance(PlanCheckerProviderManager.class).loadPlanCheckerProviders();
startAssociatedProcesses(injector);

injector.getInstance(Announcer.class).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.analyzer.ViewDefinition;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.spi.plan.SimplePlanFragmentSerde;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.DomainTranslator;
import com.facebook.presto.spi.relation.PredicateCompiler;
Expand Down Expand Up @@ -200,7 +202,9 @@
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
import com.facebook.presto.sql.tree.Expression;
Expand Down Expand Up @@ -625,6 +629,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
// plan
jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionSerializer.class);
jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionDeserializer.class);
jsonCodecBinder(binder).bindJsonCodec(SimplePlanFragment.class);
binder.bind(SimplePlanFragmentSerde.class).to(JsonCodecSimplePlanFragmentSerde.class).in(Scopes.SINGLETON);

// history statistics
configBinder(binder).bindConfig(HistoryBasedOptimizationConfig.class);
Expand Down Expand Up @@ -785,6 +791,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
//Optional Status Detector
newOptionalBinder(binder, NodeStatusService.class);
binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON);

binder.bind(PlanCheckerProviderManager.class).in(Scopes.SINGLETON);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.testing.ProcedureTester;
import com.facebook.presto.testing.TestingAccessControlManager;
Expand Down Expand Up @@ -173,6 +174,7 @@ public class TestingPrestoServer
private final boolean nodeSchedulerIncludeCoordinator;
private final ServerInfoResource serverInfoResource;
private final ResourceManagerClusterStateProvider clusterStateProvider;
private final PlanCheckerProviderManager planCheckerProviderManager;

public static class TestShutdownAction
implements ShutdownAction
Expand Down Expand Up @@ -379,6 +381,7 @@ public TestingPrestoServer(
statsCalculator = injector.getInstance(StatsCalculator.class);
eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class));
clusterStateProvider = null;
planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class);
}
else if (resourceManager) {
dispatchManager = null;
Expand All @@ -390,6 +393,7 @@ else if (resourceManager) {
statsCalculator = null;
eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class));
clusterStateProvider = injector.getInstance(ResourceManagerClusterStateProvider.class);
planCheckerProviderManager = null;
}
else if (coordinatorSidecar) {
dispatchManager = null;
Expand All @@ -401,6 +405,7 @@ else if (coordinatorSidecar) {
statsCalculator = null;
eventListenerManager = null;
clusterStateProvider = null;
planCheckerProviderManager = null;
}
else if (catalogServer) {
dispatchManager = null;
Expand All @@ -412,6 +417,7 @@ else if (catalogServer) {
statsCalculator = null;
eventListenerManager = null;
clusterStateProvider = null;
planCheckerProviderManager = null;
}
else {
dispatchManager = null;
Expand All @@ -423,6 +429,7 @@ else if (catalogServer) {
statsCalculator = null;
eventListenerManager = null;
clusterStateProvider = null;
planCheckerProviderManager = null;
}
localMemoryManager = injector.getInstance(LocalMemoryManager.class);
nodeManager = injector.getInstance(InternalNodeManager.class);
Expand Down Expand Up @@ -662,6 +669,11 @@ public ClusterMemoryManager getClusterMemoryManager()
return (ClusterMemoryManager) clusterMemoryManager;
}

public PlanCheckerProviderManager getPlanCheckerProviderManager()
{
return planCheckerProviderManager;
}

public GracefulShutdownHandler getGracefulShutdownHandler()
{
return gracefulShutdownHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
properties.getPartitionedSources());

Set<VariableReferenceExpression> fragmentVariableTypes = extractOutputVariables(root);
planChecker.validatePlanFragment(root, session, metadata, warningCollector);

Set<PlanNodeId> tableWriterNodeIds = PlanFragmenterUtils.getTableWriterNodeIds(root);
boolean outputTableWriterFragment = tableWriterNodeIds.stream().anyMatch(outputTableWriterNodeIds::contains);
if (outputTableWriterFragment) {
Expand All @@ -164,6 +162,8 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
Optional.of(statsAndCosts.getForSubplan(root)),
Optional.of(jsonFragmentPlan(root, fragmentVariableTypes, statsAndCosts.getForSubplan(root), metadata.getFunctionAndTypeManager(), session)));

planChecker.validatePlanFragment(fragment, session, metadata, warningCollector);

return new SubPlan(fragment, properties.getChildren());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.sql.planner.BasePlanFragmenter.FragmentProperties;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.planner.sanity.PlanChecker;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;
Expand All @@ -54,13 +55,13 @@ public class PlanFragmenter
private final PlanChecker singleNodePlanChecker;

@Inject
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig)
public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false);
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true);
this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, planCheckerProviderManager);
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager);
}

public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.plan;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.spi.plan.SimplePlanFragmentSerde;
import com.google.inject.Inject;

import java.nio.charset.StandardCharsets;

import static java.util.Objects.requireNonNull;

public class JsonCodecSimplePlanFragmentSerde
implements SimplePlanFragmentSerde
{
private final JsonCodec<SimplePlanFragment> codec;

@Inject
public JsonCodecSimplePlanFragmentSerde(JsonCodec<SimplePlanFragment> codec)
{
this.codec = requireNonNull(codec, "SimplePlanFragment JSON codec is null");
}

@Override
public String serialize(SimplePlanFragment planFragment)
{
return new String(codec.toBytes(planFragment), StandardCharsets.UTF_8);
}

@Override
public SimplePlanFragment deserialize(String value)
{
return codec.fromBytes(value.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,35 @@
import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.plan.PlanCheckerProvider;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Multimap;

import javax.inject.Inject;

import static java.util.Objects.requireNonNull;

/**
* Perform checks on the plan that may generate warnings or errors.
*/
public final class PlanChecker
{
private final Multimap<Stage, Checker> checkers;
private final PlanCheckerProviderManager planCheckerProviderManager;

@Inject
public PlanChecker(FeaturesConfig featuresConfig)
public PlanChecker(FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager)
{
this(featuresConfig, false);
this(featuresConfig, false, planCheckerProviderManager);
}

public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode)
public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanCheckerProviderManager planCheckerProviderManager)
{
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
ImmutableListMultimap.Builder<Stage, Checker> builder = ImmutableListMultimap.builder();
builder.putAll(
Stage.INTERMEDIATE,
Expand Down Expand Up @@ -78,25 +85,58 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode)
public void validateFinalPlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
{
checkers.get(Stage.FINAL).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector));
for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) {
for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFinalPlanCheckers()) {
checker.validate(planNode, warningCollector);
}
}
}

public void validateIntermediatePlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
{
checkers.get(Stage.INTERMEDIATE).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector));
for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) {
for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getIntermediatePlanCheckers()) {
checker.validate(planNode, warningCollector);
}
}
}

public void validatePlanFragment(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
public void validatePlanFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector)
{
checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector));
checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validateFragment(planFragment, session, metadata, warningCollector));
for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) {
for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFragmentPlanCheckers()) {
checker.validateFragment(toSimplePlanFragment(planFragment), warningCollector);
}
}
}

public interface Checker
{
void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector);

default void validateFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector)
{
validate(planFragment.getRoot(), session, metadata, warningCollector);
}
}

private enum Stage
{
INTERMEDIATE, FINAL, FRAGMENT
}

private static SimplePlanFragment toSimplePlanFragment(PlanFragment planFragment)
{
return new SimplePlanFragment(
planFragment.getId(),
planFragment.getRoot(),
planFragment.getVariables(),
planFragment.getPartitioning(),
planFragment.getTableScanSchedulingOrder(),
planFragment.getPartitioningScheme(),
planFragment.getStageExecutionDescriptor(),
planFragment.isOutputTableWriterFragment());
}
}
Loading

0 comments on commit 0e9b659

Please sign in to comment.