diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java index 9a6d37fdb950..198f23f1ba22 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SqlQueryScheduler.java @@ -594,7 +594,7 @@ private StreamingPlanSection tryCostBasedOptimize(StreamingPlanSection section) .forEach(currentSubPlan -> { Optional newPlanFragment = performRuntimeOptimizations(currentSubPlan); if (newPlanFragment.isPresent()) { - planChecker.validatePlanFragment(newPlanFragment.get().getRoot(), session, metadata, warningCollector); + planChecker.validatePlanFragment(newPlanFragment.get(), session, metadata, warningCollector); oldToNewFragment.put(currentSubPlan.getFragment(), newPlanFragment.get()); } }); diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java index 22ecb4b1c473..7983e017d024 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java @@ -36,6 +36,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory; import com.facebook.presto.spi.security.PasswordAuthenticatorFactory; @@ -48,6 +49,7 @@ import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory; import com.facebook.presto.sql.analyzer.AnalyzerProviderManager; import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.storage.TempStorageManager; import com.facebook.presto.tracing.TracerProviderManager; import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager; @@ -131,6 +133,7 @@ public class PluginManager private final AnalyzerProviderManager analyzerProviderManager; private final QueryPreparerProviderManager queryPreparerProviderManager; private final NodeStatusNotificationManager nodeStatusNotificationManager; + private final PlanCheckerProviderManager planCheckerProviderManager; @Inject public PluginManager( @@ -152,7 +155,8 @@ public PluginManager( ClusterTtlProviderManager clusterTtlProviderManager, HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager, TracerProviderManager tracerProviderManager, - NodeStatusNotificationManager nodeStatusNotificationManager) + NodeStatusNotificationManager nodeStatusNotificationManager, + PlanCheckerProviderManager planCheckerProviderManager) { requireNonNull(nodeInfo, "nodeInfo is null"); requireNonNull(config, "config is null"); @@ -184,6 +188,7 @@ public PluginManager( this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null"); this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null"); this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null"); + this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null"); } public void loadPlugins() @@ -348,6 +353,11 @@ public void installPlugin(Plugin plugin) log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName()); nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory); } + + for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) { + log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName()); + planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory); + } } public void installCoordinatorPlugin(CoordinatorPlugin plugin) diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index 8b3aaa3009cc..0f3ba3b14df0 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -47,6 +47,7 @@ import com.facebook.presto.server.security.ServerSecurityModule; import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.parser.SqlParserOptions; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.storage.TempStorageManager; import com.facebook.presto.storage.TempStorageModule; import com.facebook.presto.tracing.TracerProviderManager; @@ -177,6 +178,7 @@ public void run() injector.getInstance(TracerProviderManager.class).loadTracerProvider(); injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider(); injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification(); + injector.getInstance(PlanCheckerProviderManager.class).loadPlanCheckerProviders(); startAssociatedProcesses(injector); injector.getInstance(Announcer.class).start(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 85658fe457f7..85fd54dfb767 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -146,6 +146,8 @@ import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.analyzer.ViewDefinition; import com.facebook.presto.spi.function.SqlInvokedFunction; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; import com.facebook.presto.spi.relation.DeterminismEvaluator; import com.facebook.presto.spi.relation.DomainTranslator; import com.facebook.presto.spi.relation.PredicateCompiler; @@ -200,7 +202,9 @@ import com.facebook.presto.sql.planner.NodePartitioningManager; import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.facebook.presto.sql.tree.Expression; @@ -625,6 +629,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon // plan jsonBinder(binder).addKeySerializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionSerializer.class); jsonBinder(binder).addKeyDeserializerBinding(VariableReferenceExpression.class).to(VariableReferenceExpressionDeserializer.class); + jsonCodecBinder(binder).bindJsonCodec(SimplePlanFragment.class); + binder.bind(SimplePlanFragmentSerde.class).to(JsonCodecSimplePlanFragmentSerde.class).in(Scopes.SINGLETON); // history statistics configBinder(binder).bindConfig(HistoryBasedOptimizationConfig.class); @@ -785,6 +791,8 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon //Optional Status Detector newOptionalBinder(binder, NodeStatusService.class); binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON); + + binder.bind(PlanCheckerProviderManager.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java index 6157d89c19e0..86f72c2df636 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java @@ -74,6 +74,7 @@ import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.NodePartitioningManager; import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.storage.TempStorageManager; import com.facebook.presto.testing.ProcedureTester; import com.facebook.presto.testing.TestingAccessControlManager; @@ -173,6 +174,7 @@ public class TestingPrestoServer private final boolean nodeSchedulerIncludeCoordinator; private final ServerInfoResource serverInfoResource; private final ResourceManagerClusterStateProvider clusterStateProvider; + private final PlanCheckerProviderManager planCheckerProviderManager; public static class TestShutdownAction implements ShutdownAction @@ -379,6 +381,7 @@ public TestingPrestoServer( statsCalculator = injector.getInstance(StatsCalculator.class); eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class)); clusterStateProvider = null; + planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class); } else if (resourceManager) { dispatchManager = null; @@ -390,6 +393,7 @@ else if (resourceManager) { statsCalculator = null; eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class)); clusterStateProvider = injector.getInstance(ResourceManagerClusterStateProvider.class); + planCheckerProviderManager = null; } else if (coordinatorSidecar) { dispatchManager = null; @@ -401,6 +405,7 @@ else if (coordinatorSidecar) { statsCalculator = null; eventListenerManager = null; clusterStateProvider = null; + planCheckerProviderManager = null; } else if (catalogServer) { dispatchManager = null; @@ -412,6 +417,7 @@ else if (catalogServer) { statsCalculator = null; eventListenerManager = null; clusterStateProvider = null; + planCheckerProviderManager = null; } else { dispatchManager = null; @@ -423,6 +429,7 @@ else if (catalogServer) { statsCalculator = null; eventListenerManager = null; clusterStateProvider = null; + planCheckerProviderManager = null; } localMemoryManager = injector.getInstance(LocalMemoryManager.class); nodeManager = injector.getInstance(InternalNodeManager.class); @@ -662,6 +669,11 @@ public ClusterMemoryManager getClusterMemoryManager() return (ClusterMemoryManager) clusterMemoryManager; } + public PlanCheckerProviderManager getPlanCheckerProviderManager() + { + return planCheckerProviderManager; + } + public GracefulShutdownHandler getGracefulShutdownHandler() { return gracefulShutdownHandler; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index e8bb552691d9..eb3d4e7ff872 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -140,8 +140,6 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan properties.getPartitionedSources()); Set fragmentVariableTypes = extractOutputVariables(root); - planChecker.validatePlanFragment(root, session, metadata, warningCollector); - Set tableWriterNodeIds = PlanFragmenterUtils.getTableWriterNodeIds(root); boolean outputTableWriterFragment = tableWriterNodeIds.stream().anyMatch(outputTableWriterNodeIds::contains); if (outputTableWriterFragment) { @@ -164,6 +162,8 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan Optional.of(statsAndCosts.getForSubplan(root)), Optional.of(jsonFragmentPlan(root, fragmentVariableTypes, statsAndCosts.getForSubplan(root), metadata.getFunctionAndTypeManager(), session))); + planChecker.validatePlanFragment(fragment, session, metadata, warningCollector); + return new SubPlan(fragment, properties.getChildren()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index 3ef3eccaf626..f6ad388989dc 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -29,6 +29,7 @@ import com.facebook.presto.sql.planner.BasePlanFragmenter.FragmentProperties; import com.facebook.presto.sql.planner.plan.SimplePlanRewriter; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.google.common.collect.ImmutableList; import javax.inject.Inject; @@ -54,13 +55,13 @@ public class PlanFragmenter private final PlanChecker singleNodePlanChecker; @Inject - public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig) + public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitioningManager, QueryManagerConfig queryManagerConfig, FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager) { this.metadata = requireNonNull(metadata, "metadata is null"); this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null"); this.config = requireNonNull(queryManagerConfig, "queryManagerConfig is null"); - this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false); - this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true); + this.distributedPlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), false, planCheckerProviderManager); + this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager); } public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/JsonCodecSimplePlanFragmentSerde.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/JsonCodecSimplePlanFragmentSerde.java new file mode 100644 index 000000000000..dfd68ca63a57 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/JsonCodecSimplePlanFragmentSerde.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.plan; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.google.inject.Inject; + +import java.nio.charset.StandardCharsets; + +import static java.util.Objects.requireNonNull; + +public class JsonCodecSimplePlanFragmentSerde + implements SimplePlanFragmentSerde +{ + private final JsonCodec codec; + + @Inject + public JsonCodecSimplePlanFragmentSerde(JsonCodec codec) + { + this.codec = requireNonNull(codec, "SimplePlanFragment JSON codec is null"); + } + + @Override + public String serialize(SimplePlanFragment planFragment) + { + return new String(codec.toBytes(planFragment), StandardCharsets.UTF_8); + } + + @Override + public SimplePlanFragment deserialize(String value) + { + return codec.fromBytes(value.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java index 0263b88a93fc..20495092f914 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java @@ -16,28 +16,35 @@ import com.facebook.presto.Session; import com.facebook.presto.metadata.Metadata; import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.PlanCheckerProvider; import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.planner.PlanFragment; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Multimap; import javax.inject.Inject; +import static java.util.Objects.requireNonNull; + /** * Perform checks on the plan that may generate warnings or errors. */ public final class PlanChecker { private final Multimap checkers; + private final PlanCheckerProviderManager planCheckerProviderManager; @Inject - public PlanChecker(FeaturesConfig featuresConfig) + public PlanChecker(FeaturesConfig featuresConfig, PlanCheckerProviderManager planCheckerProviderManager) { - this(featuresConfig, false); + this(featuresConfig, false, planCheckerProviderManager); } - public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode) + public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanCheckerProviderManager planCheckerProviderManager) { + this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null"); ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder(); builder.putAll( Stage.INTERMEDIATE, @@ -78,25 +85,58 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode) public void validateFinalPlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) { checkers.get(Stage.FINAL).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector)); + for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) { + for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFinalPlanCheckers()) { + checker.validate(planNode, warningCollector); + } + } } public void validateIntermediatePlan(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) { checkers.get(Stage.INTERMEDIATE).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector)); + for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) { + for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getIntermediatePlanCheckers()) { + checker.validate(planNode, warningCollector); + } + } } - public void validatePlanFragment(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector) + public void validatePlanFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector) { - checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validate(planNode, session, metadata, warningCollector)); + checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validateFragment(planFragment, session, metadata, warningCollector)); + for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) { + for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFragmentPlanCheckers()) { + checker.validateFragment(toSimplePlanFragment(planFragment), warningCollector); + } + } } public interface Checker { void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector); + + default void validateFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector) + { + validate(planFragment.getRoot(), session, metadata, warningCollector); + } } private enum Stage { INTERMEDIATE, FINAL, FRAGMENT } + + private static SimplePlanFragment toSimplePlanFragment(PlanFragment planFragment) + { + return new SimplePlanFragment( + planFragment.getId(), + planFragment.getRoot(), + planFragment.getVariables(), + planFragment.getPartitioning(), + planFragment.getTableScanSchedulingOrder(), + planFragment.getPartitioningScheme(), + planFragment.getStageExecutionDescriptor(), + planFragment.isOutputTableWriterFragment()); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java new file mode 100644 index 000000000000..7bbc7acaec60 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.sanity; + +import com.facebook.presto.spi.plan.PlanCheckerProvider; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.google.inject.Inject; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class PlanCheckerProviderManager +{ + private final SimplePlanFragmentSerde simplePlanFragmentSerde; + private final Map providerFactories = new ConcurrentHashMap<>(); + private final CopyOnWriteArrayList providers = new CopyOnWriteArrayList<>(); + + @Inject + public PlanCheckerProviderManager(SimplePlanFragmentSerde simplePlanFragmentSerde) + { + this.simplePlanFragmentSerde = requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null"); + } + + public void addPlanCheckerProviderFactory(PlanCheckerProviderFactory planCheckerProviderFactory) + { + requireNonNull(planCheckerProviderFactory, "planCheckerProviderFactory is null"); + if (providerFactories.putIfAbsent(planCheckerProviderFactory.getName(), planCheckerProviderFactory) != null) { + throw new IllegalArgumentException(format("PlanCheckerProviderFactory '%s' is already registered", planCheckerProviderFactory.getName())); + } + } + + public void loadPlanCheckerProviders() + { + providers.addAllAbsent(providerFactories.values().stream().map(pc -> pc.create(simplePlanFragmentSerde)).collect(Collectors.toList())); + } + + public List getPlanCheckerProviders() + { + return providers; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 4bc53439ed6d..96c4c61beff7 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -140,6 +140,7 @@ import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.spi.plan.StageExecutionDescriptor; import com.facebook.presto.spi.plan.TableScanNode; import com.facebook.presto.spiller.FileSingleStreamSpillerFactory; @@ -185,8 +186,10 @@ import com.facebook.presto.sql.planner.RemoteSourceFactory; import com.facebook.presto.sql.planner.SubPlan; import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; +import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; import com.facebook.presto.sql.planner.planPrinter.PlanPrinter; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.facebook.presto.sql.tree.AlterFunction; @@ -297,6 +300,7 @@ public class LocalQueryRunner private final SqlParser sqlParser; private final PlanFragmenter planFragmenter; private final InMemoryNodeManager nodeManager; + private final PlanCheckerProviderManager planCheckerProviderManager; private final PageSorter pageSorter; private final PageIndexerFactory pageIndexerFactory; private final MetadataManager metadata; @@ -432,9 +436,10 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, new AnalyzePropertyManager(), transactionManager); this.splitManager = new SplitManager(metadata, new QueryManagerConfig(), nodeSchedulerConfig); - this.distributedPlanChecker = new PlanChecker(featuresConfig, false); - this.singleNodePlanChecker = new PlanChecker(featuresConfig, true); - this.planFragmenter = new PlanFragmenter(this.metadata, this.nodePartitioningManager, new QueryManagerConfig(), featuresConfig); + this.planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(jsonCodec(SimplePlanFragment.class))); + this.distributedPlanChecker = new PlanChecker(featuresConfig, false, planCheckerProviderManager); + this.singleNodePlanChecker = new PlanChecker(featuresConfig, true, planCheckerProviderManager); + this.planFragmenter = new PlanFragmenter(this.metadata, this.nodePartitioningManager, new QueryManagerConfig(), featuresConfig, planCheckerProviderManager); this.joinCompiler = new JoinCompiler(metadata); this.pageIndexerFactory = new GroupByHashPageIndexerFactory(joinCompiler); this.statsNormalizer = new StatsNormalizer(); @@ -515,7 +520,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, new ThrowingClusterTtlProviderManager(), historyBasedPlanStatisticsManager, new TracerProviderManager(new TracingConfig()), - new NodeStatusNotificationManager()); + new NodeStatusNotificationManager(), + planCheckerProviderManager); connectorManager.addConnectorFactory(globalSystemConnectorFactory); connectorManager.createConnection(GlobalSystemConnector.NAME, GlobalSystemConnector.NAME, ImmutableMap.of()); @@ -643,6 +649,12 @@ public ConnectorPlanOptimizerManager getPlanOptimizerManager() return planOptimizerManager; } + @Override + public PlanCheckerProviderManager getPlanCheckerProviderManager() + { + return planCheckerProviderManager; + } + public PageSourceManager getPageSourceManager() { return pageSourceManager; diff --git a/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java index 2cae8bfa14d5..b19fa40da024 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java @@ -27,6 +27,7 @@ import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.NodePartitioningManager; import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.transaction.TransactionManager; import org.intellij.lang.annotations.Language; @@ -54,6 +55,8 @@ public interface QueryRunner ConnectorPlanOptimizerManager getPlanOptimizerManager(); + PlanCheckerProviderManager getPlanCheckerProviderManager(); + StatsCalculator getStatsCalculator(); Optional getEventListener(); diff --git a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java index d19b15f2a185..e788dbe4e8a9 100644 --- a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java +++ b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java @@ -42,6 +42,7 @@ import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; import com.facebook.presto.spi.plan.ProjectNode; +import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.spi.plan.TableScanNode; import com.facebook.presto.spi.plan.UnionNode; import com.facebook.presto.spi.relation.RowExpression; @@ -57,7 +58,9 @@ import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.JoinNode; +import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; import com.facebook.presto.sql.planner.plan.SequenceNode; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.sql.tree.Cast; import com.facebook.presto.sql.tree.SymbolReference; import com.facebook.presto.tpch.TpchColumnHandle; @@ -84,6 +87,7 @@ import java.util.Optional; import java.util.function.Function; +import static com.facebook.airlift.json.JsonCodec.jsonCodec; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; @@ -123,6 +127,7 @@ public class TestCostCalculator private NodeScheduler nodeScheduler; private NodePartitioningManager nodePartitioningManager; private TestingRowExpressionTranslator translator; + private PlanCheckerProviderManager planCheckerProviderManager; @BeforeClass public void setUp() @@ -151,7 +156,8 @@ public void setUp() new SimpleTtlNodeSelectorConfig()); PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager(); nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager, new NodeSelectionStats()); - planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig()); + planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(jsonCodec(SimplePlanFragment.class))); + planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig(), planCheckerProviderManager); translator = new TestingRowExpressionTranslator(); } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java index 09cd72184cac..b569f25c7458 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java @@ -25,6 +25,7 @@ import com.facebook.presto.split.SplitManager; import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.TestingAccessControlManager; @@ -185,6 +186,12 @@ public ConnectorPlanOptimizerManager getPlanOptimizerManager() throw new UnsupportedOperationException(); } + @Override + public PlanCheckerProviderManager getPlanCheckerProviderManager() + { + throw new UnsupportedOperationException(); + } + @Override public StatsCalculator getStatsCalculator() { diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index 8eb985d2a059..9e258ccf9c01 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -136,6 +136,8 @@ import com.facebook.presto.spi.PageSorter; import com.facebook.presto.spi.analyzer.ViewDefinition; import com.facebook.presto.spi.memory.ClusterMemoryPoolManager; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; import com.facebook.presto.spi.relation.DeterminismEvaluator; import com.facebook.presto.spi.relation.DomainTranslator; import com.facebook.presto.spi.relation.PredicateCompiler; @@ -186,7 +188,9 @@ import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.sql.planner.PlanFragmenter; import com.facebook.presto.sql.planner.PlanOptimizers; +import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.facebook.presto.tracing.TracerProviderManager; @@ -292,6 +296,8 @@ protected void setup(Binder binder) jsonCodecBinder(binder).bindJsonCodec(PrestoSparkLocalShuffleWriteInfo.class); jsonCodecBinder(binder).bindJsonCodec(BatchTaskUpdateRequest.class); jsonCodecBinder(binder).bindJsonCodec(BroadcastFileInfo.class); + jsonCodecBinder(binder).bindJsonCodec(SimplePlanFragment.class); + binder.bind(SimplePlanFragmentSerde.class).to(JsonCodecSimplePlanFragmentSerde.class).in(Scopes.SINGLETON); // smile codecs smileCodecBinder(binder).bindSmileCodec(TaskSource.class); @@ -542,6 +548,8 @@ protected void setup(Binder binder) // extra credentials and authenticator for Presto-on-Spark newSetBinder(binder, PrestoSparkCredentialsProvider.class); newSetBinder(binder, PrestoSparkAuthenticatorProvider.class); + + binder.bind(PlanCheckerProviderManager.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java index 5f84fb6daea8..850068ac5d72 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java @@ -99,6 +99,7 @@ import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.Plan; import com.facebook.presto.sql.planner.SubPlan; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.sql.tree.Statement; import com.facebook.presto.storage.TempStorageManager; import com.facebook.presto.transaction.TransactionManager; @@ -206,6 +207,7 @@ public class PrestoSparkQueryExecutionFactory private final HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker; private final AdaptivePlanOptimizers adaptivePlanOptimizers; private final FragmentStatsProvider fragmentStatsProvider; + private final PlanCheckerProviderManager planCheckerProviderManager; @Inject public PrestoSparkQueryExecutionFactory( @@ -245,7 +247,8 @@ public PrestoSparkQueryExecutionFactory( Optional errorClassifier, HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager, AdaptivePlanOptimizers adaptivePlanOptimizers, - FragmentStatsProvider fragmentStatsProvider) + FragmentStatsProvider fragmentStatsProvider, + PlanCheckerProviderManager planCheckerProviderManager) { this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null"); this.sessionSupplier = requireNonNull(sessionSupplier, "sessionSupplier is null"); @@ -284,6 +287,7 @@ public PrestoSparkQueryExecutionFactory( this.historyBasedPlanStatisticsTracker = requireNonNull(historyBasedPlanStatisticsManager, "historyBasedPlanStatisticsManager is null").getHistoryBasedPlanStatisticsTracker(); this.adaptivePlanOptimizers = requireNonNull(adaptivePlanOptimizers, "adaptivePlanOptimizers is null"); this.fragmentStatsProvider = requireNonNull(fragmentStatsProvider, "fragmentStatsProvider is null"); + this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null"); } public static QueryInfo createQueryInfo( @@ -764,7 +768,8 @@ else if (preparedQuery.isExplainTypeValidate()) { variableAllocator, planNodeIdAllocator, fragmentStatsProvider, - bootstrapMetricsCollector); + bootstrapMetricsCollector, + planCheckerProviderManager); } } } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java index 87edae52c036..d649580af500 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java @@ -63,6 +63,7 @@ import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.transaction.TransactionManager; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.UncheckedExecutionException; @@ -175,7 +176,8 @@ public PrestoSparkAdaptiveQueryExecution( VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, FragmentStatsProvider fragmentStatsProvider, - Optional>> bootstrapMetricsCollector) + Optional>> bootstrapMetricsCollector, + PlanCheckerProviderManager planCheckerProviderManager) { super( sparkContext, @@ -219,10 +221,10 @@ public PrestoSparkAdaptiveQueryExecution( this.adaptivePlanOptimizers = requireNonNull(adaptivePlanOptimizers, "adaptivePlanOptimizers is null").getAdaptiveOptimizers(); this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null"); this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); - this.iterativePlanFragmenter = createIterativePlanFragmenter(); + this.iterativePlanFragmenter = createIterativePlanFragmenter(requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null")); } - private IterativePlanFragmenter createIterativePlanFragmenter() + private IterativePlanFragmenter createIterativePlanFragmenter(PlanCheckerProviderManager planCheckerProviderManager) { boolean forceSingleNode = false; Function isFragmentFinished = this.executedFragments::contains; @@ -232,7 +234,7 @@ private IterativePlanFragmenter createIterativePlanFragmenter() this.planAndMore.getPlan(), isFragmentFinished, this.metadata, - new PlanChecker(this.featuresConfig, forceSingleNode), + new PlanChecker(this.featuresConfig, forceSingleNode, planCheckerProviderManager), this.idAllocator, new PrestoSparkNodePartitioningManager(this.partitioningProviderManager), this.queryManagerConfig, diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java index 015f1936c4bc..f85c08ef58eb 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java @@ -67,6 +67,7 @@ import com.facebook.presto.sql.parser.SqlParserOptions; import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.MaterializedRow; import com.facebook.presto.testing.QueryRunner; @@ -156,6 +157,7 @@ public class PrestoSparkQueryRunner private final StatsCalculator statsCalculator; private final PluginManager pluginManager; private final ConnectorManager connectorManager; + private final PlanCheckerProviderManager planCheckerProviderManager; private final Set waitTimeMetrics; private final HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager; @@ -334,6 +336,7 @@ public PrestoSparkQueryRunner( statsCalculator = injector.getInstance(StatsCalculator.class); pluginManager = injector.getInstance(PluginManager.class); connectorManager = injector.getInstance(ConnectorManager.class); + planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class); waitTimeMetrics = injector.getInstance(new Key>() {}); historyBasedPlanStatisticsManager = injector.getInstance(HistoryBasedPlanStatisticsManager.class); @@ -477,6 +480,12 @@ public ConnectorPlanOptimizerManager getPlanOptimizerManager() return connectorPlanOptimizerManager; } + @Override + public PlanCheckerProviderManager getPlanCheckerProviderManager() + { + return planCheckerProviderManager; + } + @Override public StatsCalculator getStatsCalculator() { diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java index 8f797aa42c7e..7ef6be2897f0 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/planner/TestIterativePlanFragmenter.java @@ -58,6 +58,7 @@ import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; import com.facebook.presto.spi.plan.ProjectNode; +import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.spi.plan.StageExecutionDescriptor; import com.facebook.presto.spi.plan.TableScanNode; import com.facebook.presto.spi.relation.RowExpression; @@ -73,7 +74,9 @@ import com.facebook.presto.sql.planner.TypeProvider; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.JoinNode; +import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.tpch.TpchColumnHandle; import com.facebook.presto.tpch.TpchTableHandle; import com.facebook.presto.tpch.TpchTableLayoutHandle; @@ -97,6 +100,7 @@ import java.util.Set; import java.util.function.Function; +import static com.facebook.airlift.json.JsonCodec.jsonCodec; import static com.facebook.presto.SystemSessionProperties.FORCE_SINGLE_NODE_OUTPUT; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.metadata.MetadataManager.createTestMetadataManager; @@ -130,6 +134,7 @@ public class TestIterativePlanFragmenter private FinalizerService finalizerService; private NodeScheduler nodeScheduler; private NodePartitioningManager nodePartitioningManager; + private PlanCheckerProviderManager planCheckerProviderManager; @BeforeClass public void setUp() @@ -157,7 +162,8 @@ public void setUp() new SimpleTtlNodeSelectorConfig()); PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager(); nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager, new NodeSelectionStats()); - planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig()); + planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(jsonCodec(SimplePlanFragment.class))); + planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new FeaturesConfig(), planCheckerProviderManager); } @AfterClass(alwaysRun = true) @@ -224,7 +230,7 @@ private Void runTestIterativePlanFragmenter(PlanNode node, Plan plan, SubPlan fu plan, testingFragmentTracker::isFragmentFinished, metadata, - new PlanChecker(new FeaturesConfig()), + new PlanChecker(new FeaturesConfig(), planCheckerProviderManager), new PlanNodeIdAllocator(), nodePartitioningManager, new QueryManagerConfig(), diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java index 81e8f55b0a66..aa976b80cac5 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory; import com.facebook.presto.spi.security.PasswordAuthenticatorFactory; @@ -142,4 +143,9 @@ default Iterable getNodeStatusNotificatio { return emptyList(); } + + default Iterable getPlanCheckerProviderFactories() + { + return emptyList(); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanChecker.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanChecker.java new file mode 100644 index 000000000000..16789510c62e --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanChecker.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.spi.plan; + +import com.facebook.presto.spi.WarningCollector; + +public interface PlanChecker +{ + void validate(PlanNode planNode, WarningCollector warningCollector); + + default void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector) + { + validate(planFragment.getRoot(), warningCollector); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProvider.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProvider.java new file mode 100644 index 000000000000..2391c542776e --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.spi.plan; + +import java.util.Collections; +import java.util.List; + +public interface PlanCheckerProvider +{ + default List getIntermediatePlanCheckers() + { + return Collections.emptyList(); + } + + default List getFinalPlanCheckers() + { + return Collections.emptyList(); + } + + default List getFragmentPlanCheckers() + { + return Collections.emptyList(); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderFactory.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderFactory.java new file mode 100644 index 000000000000..11d4cb726b2b --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderFactory.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.plan; + +public interface PlanCheckerProviderFactory +{ + String getName(); + + PlanCheckerProvider create(SimplePlanFragmentSerde simplePlanFragmentSerde); +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/SimplePlanFragmentSerde.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/SimplePlanFragmentSerde.java new file mode 100644 index 000000000000..09e9f4443de4 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/SimplePlanFragmentSerde.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.plan; + +public interface SimplePlanFragmentSerde +{ + String serialize(SimplePlanFragment planFragment); + + SimplePlanFragment deserialize(String value); +} diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java index 1757ba65e8d5..184330bd4854 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java @@ -572,14 +572,14 @@ private QueryExplainer getQueryExplainer() .getPlanningTimeOptimizers(); return new QueryExplainer( optimizers, - new PlanFragmenter(metadata, queryRunner.getNodePartitioningManager(), new QueryManagerConfig(), featuresConfig), + new PlanFragmenter(metadata, queryRunner.getNodePartitioningManager(), new QueryManagerConfig(), featuresConfig, queryRunner.getPlanCheckerProviderManager()), metadata, queryRunner.getAccessControl(), sqlParser, queryRunner.getStatsCalculator(), costCalculator, ImmutableMap.of(), - new PlanChecker(featuresConfig, false)); + new PlanChecker(featuresConfig, false, queryRunner.getPlanCheckerProviderManager())); } protected static void skipTestUnless(boolean requirement) diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java index 8b5f88303ee0..5329d380ce9e 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java @@ -46,6 +46,7 @@ import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.NodePartitioningManager; import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.TestingAccessControlManager; @@ -581,6 +582,13 @@ public TestingAccessControlManager getAccessControl() return coordinators.get(0).getAccessControl(); } + @Override + public PlanCheckerProviderManager getPlanCheckerProviderManager() + { + checkState(coordinators.size() == 1, "Expected a single coordinator"); + return coordinators.get(0).getPlanCheckerProviderManager(); + } + public TestingPrestoServer getCoordinator() { checkState(coordinators.size() == 1, "Expected a single coordinator"); diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/StandaloneQueryRunner.java b/presto-tests/src/main/java/com/facebook/presto/tests/StandaloneQueryRunner.java index 05e87aa335de..980fb4f991bc 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/StandaloneQueryRunner.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/StandaloneQueryRunner.java @@ -30,6 +30,7 @@ import com.facebook.presto.sql.parser.SqlParserOptions; import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.TestingAccessControlManager; @@ -163,6 +164,12 @@ public ConnectorPlanOptimizerManager getPlanOptimizerManager() return server.getPlanOptimizerManager(); } + @Override + public PlanCheckerProviderManager getPlanCheckerProviderManager() + { + return server.getPlanCheckerProviderManager(); + } + @Override public StatsCalculator getStatsCalculator() { diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java index 478b7566cde5..c03ee3b3104e 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java @@ -44,11 +44,13 @@ import java.io.IOException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static com.facebook.airlift.http.client.Request.Builder.prepareGet; import static com.facebook.airlift.http.client.Request.Builder.preparePut; import static com.facebook.airlift.http.client.StringResponseHandler.createStringResponseHandler; import static com.facebook.airlift.testing.Closeables.closeQuietly; +import static com.facebook.presto.SystemSessionProperties.EAGER_PLAN_VALIDATION_ENABLED; import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT; import static com.facebook.presto.execution.QueryState.FAILED; import static com.facebook.presto.execution.QueryState.FINISHED; @@ -374,6 +376,49 @@ public void testQueuedQueryInteraction() assertEquals(queryInfo.getErrorCode(), ADMINISTRATIVELY_PREEMPTED.toErrorCode()); } + @Test(timeOut = 240_000) + public void testEagerPlanValidation() + throws Exception + { + AtomicBoolean triggerValidationFailure = new AtomicBoolean(); + + queryRunner.installPlugin(new ResourceGroupManagerPlugin()); + queryRunner.installPlugin(new TestingPlanCheckerProviderPlugin(triggerValidationFailure)); + queryRunner.getPlanCheckerProviderManager().loadPlanCheckerProviders(); + queryRunner.getCoordinator().getResourceGroupManager().get().forceSetConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_eager_plan_validation.json"))); + + Session.SessionBuilder builder = testSessionBuilder() + .setCatalog("tpch") + .setSchema("sf100000") + .setSource("eager") + .setSystemProperty(EAGER_PLAN_VALIDATION_ENABLED, "true"); + + Session firstSession = builder.setQueryId(QueryId.valueOf("20240930_203743_00001_11111")).build(); + QueryId firstQuery = createQuery(queryRunner, firstSession, LONG_LASTING_QUERY); + waitForQueryState(queryRunner, firstQuery, RUNNING); + + Session secondSession = builder.setQueryId(QueryId.valueOf("20240930_203743_00002_22222")).build(); + QueryId secondQuery = createQuery(queryRunner, secondSession, LONG_LASTING_QUERY); + waitForQueryState(queryRunner, secondQuery, QUEUED); + + Session thirdSession = builder.setQueryId(QueryId.valueOf("20240930_203743_00003_33333")).build(); + QueryId thirdQuery = createQuery(queryRunner, thirdSession, LONG_LASTING_QUERY); + + // Force failure during plan validation after queuing has begun + triggerValidationFailure.set(true); + waitForQueryState(queryRunner, thirdQuery, FAILED); + + DispatchManager dispatchManager = queryRunner.getCoordinator().getDispatchManager(); + BasicQueryInfo queryInfo = dispatchManager.getQueryInfo(thirdQuery); + assertEquals(queryInfo.getErrorCode(), TriggerFailurePlanChecker.FAILURE_ERROR_CODE.toErrorCode()); + assertNotNull(queryInfo.getFailureInfo()); + assertNotNull(queryInfo.getFailureInfo().getMessage()); + assertEquals(queryInfo.getFailureInfo().getMessage(), TriggerFailurePlanChecker.FAILURE_MESSAGE); + + cancelQuery(queryRunner, secondQuery); + cancelQuery(queryRunner, firstQuery); + } + private void assertResourceGroup(DistributedQueryRunner queryRunner, Session session, String query, ResourceGroupId expectedResourceGroup) throws InterruptedException { diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestingPlanCheckerProviderPlugin.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestingPlanCheckerProviderPlugin.java new file mode 100644 index 000000000000..050bebcd44a9 --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestingPlanCheckerProviderPlugin.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.plan.PlanChecker; +import com.facebook.presto.spi.plan.PlanCheckerProvider; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestingPlanCheckerProviderPlugin + implements Plugin, PlanCheckerProviderFactory, PlanCheckerProvider +{ + private final AtomicBoolean triggerValidationFailure; + + public TestingPlanCheckerProviderPlugin(AtomicBoolean triggerValidationFailure) + { + this.triggerValidationFailure = triggerValidationFailure; + } + + @Override + public Iterable getPlanCheckerProviderFactories() + { + return ImmutableList.of(this); + } + + @Override + public String getName() + { + return "TestPlanCheckers"; + } + + @Override + public PlanCheckerProvider create(SimplePlanFragmentSerde simplePlanFragmentSerde) + { + return this; + } + + @Override + public List getIntermediatePlanCheckers() + { + return ImmutableList.of(new TriggerFailurePlanChecker(triggerValidationFailure)); + } +} diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TriggerFailurePlanChecker.java b/presto-tests/src/test/java/com/facebook/presto/execution/TriggerFailurePlanChecker.java new file mode 100644 index 000000000000..c2e64a16b56b --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TriggerFailurePlanChecker.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution; + +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.StandardErrorCode; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.PlanChecker; +import com.facebook.presto.spi.plan.PlanNode; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; + +public class TriggerFailurePlanChecker + implements PlanChecker +{ + public static final StandardErrorCode FAILURE_ERROR_CODE = QUERY_REJECTED; + public static final String FAILURE_MESSAGE = "Plan validation failure triggered"; + private final AtomicBoolean triggerValidationFailure; + + public TriggerFailurePlanChecker(AtomicBoolean triggerValidationFailure) + { + this.triggerValidationFailure = triggerValidationFailure; + } + + @Override + public void validate(PlanNode planNode, WarningCollector warningCollector) + { + if (triggerValidationFailure.get()) { + throw new PrestoException(FAILURE_ERROR_CODE, FAILURE_MESSAGE); + } + } +} diff --git a/presto-tests/src/test/resources/resource_groups_config_eager_plan_validation.json b/presto-tests/src/test/resources/resource_groups_config_eager_plan_validation.json new file mode 100644 index 000000000000..de4655f643ca --- /dev/null +++ b/presto-tests/src/test/resources/resource_groups_config_eager_plan_validation.json @@ -0,0 +1,32 @@ +{ + "rootGroups": [ + { + "name": "global", + "softMemoryLimit": "1MB", + "hardConcurrencyLimit": 100, + "maxQueued": 1000, + "subGroups": [ + { + "name": "user-${USER}", + "softMemoryLimit": "1MB", + "hardConcurrencyLimit": 3, + "maxQueued": 3, + "subGroups": [ + { + "name": "eager-${USER}", + "softMemoryLimit": "1MB", + "hardConcurrencyLimit": 1, + "maxQueued": 2 + } + ] + } + ] + } + ], + "selectors": [ + { + "source": "(?i).*eager.*", + "group": "global.user-${USER}.eager-${USER}" + } + ] +} diff --git a/presto-thrift-connector/src/test/java/com/facebook/presto/connector/thrift/integration/ThriftQueryRunner.java b/presto-thrift-connector/src/test/java/com/facebook/presto/connector/thrift/integration/ThriftQueryRunner.java index 7f47f65ff49e..c9a4f82b7cd8 100644 --- a/presto-thrift-connector/src/test/java/com/facebook/presto/connector/thrift/integration/ThriftQueryRunner.java +++ b/presto-thrift-connector/src/test/java/com/facebook/presto/connector/thrift/integration/ThriftQueryRunner.java @@ -36,6 +36,7 @@ import com.facebook.presto.split.SplitManager; import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.TestingAccessControlManager; @@ -229,6 +230,12 @@ public ConnectorPlanOptimizerManager getPlanOptimizerManager() return source.getPlanOptimizerManager(); } + @Override + public PlanCheckerProviderManager getPlanCheckerProviderManager() + { + return source.getPlanCheckerProviderManager(); + } + @Override public StatsCalculator getStatsCalculator() {