From 49c9b7eff6efe225beea1395332c34e6482f4e9c Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 28 Oct 2024 18:07:59 -0700 Subject: [PATCH] Add native plan checker to sidecar plugin This adds a provider for a native plan checker that will send a plan fragment to the native sidecar where it is validated by performing a conversion to a Velox plan. If the conversion succeeds the query will continue, if it fails then the query will fail with an error from the native sidecar. The provider is added to the native sidecar plugin and is enabled with the config `native-plan-checker.plan-validation-enabled=true` from filename `etc/plan-checker-providers/native-plan-checker.properties`. See also: prestodb#23649 RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0008-plan-checker.md --- .../connector/ConnectorAwareNodeManager.java | 1 + .../facebook/presto/server/PluginManager.java | 10 +- .../facebook/presto/server/PrestoServer.java | 13 +- .../sql/planner/sanity/PlanChecker.java | 23 +- .../sanity/PlanCheckerProviderManager.java | 11 +- .../TestPlanCheckerProviderManager.java | 5 +- presto-native-sidecar-plugin/pom.xml | 27 ++ .../presto/sidecar/NativeSidecarPlugin.java | 17 ++ .../nativechecker/NativePlanChecker.java | 154 ++++++++++ .../NativePlanCheckerConfig.java | 36 +++ .../NativePlanCheckerErrorCode.java | 42 +++ .../NativePlanCheckerProvider.java | 51 ++++ .../NativePlanCheckerProviderFactory.java | 76 +++++ .../PlanConversionFailureInfo.java | 172 ++++++++++++ .../nativechecker/PlanConversionResponse.java | 40 +++ .../SimplePlanFragmentSerializer.java | 52 ++++ .../sidecar/TestPlanCheckerProvider.java | 265 ++++++++++++++++++ .../presto/spi/CoordinatorPlugin.java | 14 +- .../java/com/facebook/presto/spi/Plugin.java | 6 - .../spi/plan/PlanCheckerProviderContext.java | 11 +- .../facebook/presto/execution/TestQueues.java | 8 +- .../TestingPlanCheckerProviderPlugin.java | 4 +- 22 files changed, 994 insertions(+), 44 deletions(-) create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerErrorCode.java create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProvider.java create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProviderFactory.java create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionFailureInfo.java create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionResponse.java create mode 100644 presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/SimplePlanFragmentSerializer.java create mode 100644 presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java diff --git a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java index b0b092b4a306a..e7e232f3781f6 100644 --- a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorAwareNodeManager.java @@ -48,6 +48,7 @@ public Set getAllNodes() return ImmutableSet.builder() .addAll(getWorkerNodes()) .addAll(nodeManager.getCoordinators()) + .addAll(nodeManager.getCoordinatorSidecars()) .build(); } diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java index e71d1ee561994..f1cef40a1a3f6 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java @@ -354,11 +354,6 @@ public void installPlugin(Plugin plugin) log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName()); nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory); } - - for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) { - log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName()); - planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory); - } } public void installCoordinatorPlugin(CoordinatorPlugin plugin) @@ -372,6 +367,11 @@ public void installCoordinatorPlugin(CoordinatorPlugin plugin) log.info("Registering system session property provider factory %s", providerFactory.getName()); metadata.getSessionPropertyManager().addSessionPropertyProviderFactory(providerFactory); } + + for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) { + log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName()); + planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory); + } } private URLClassLoader buildClassLoader(String plugin) diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index 5d681d829ffa4..1c0ce0dff9bbc 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -26,6 +26,7 @@ import com.facebook.airlift.json.smile.SmileModule; import com.facebook.airlift.log.LogJmxModule; import com.facebook.airlift.log.Logger; +import com.facebook.airlift.node.NodeInfo; import com.facebook.airlift.node.NodeModule; import com.facebook.airlift.tracetoken.TraceTokenModule; import com.facebook.drift.server.DriftServer; @@ -39,9 +40,11 @@ import com.facebook.presto.execution.warnings.WarningCollectorModule; import com.facebook.presto.metadata.Catalog; import com.facebook.presto.metadata.CatalogManager; -import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.metadata.DiscoveryNodeManager; +import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.StaticCatalogStore; import com.facebook.presto.metadata.StaticFunctionNamespaceStore; +import com.facebook.presto.nodeManager.PluginNodeManager; import com.facebook.presto.security.AccessControlManager; import com.facebook.presto.security.AccessControlModule; import com.facebook.presto.server.security.PasswordAuthenticatorManager; @@ -179,8 +182,12 @@ public void run() injector.getInstance(TracerProviderManager.class).loadTracerProvider(); injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider(); injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification(); - injector.getInstance(PlanCheckerProviderManager.class).loadPlanCheckerProviders(); - injector.getInstance(SessionPropertyManager.class).loadSessionPropertyProviders(); + PlanCheckerProviderManager planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class); + InternalNodeManager nodeManager = injector.getInstance(DiscoveryNodeManager.class); + NodeInfo nodeInfo = injector.getInstance(NodeInfo.class); + PluginNodeManager pluginNodeManager = new PluginNodeManager(nodeManager, nodeInfo.getEnvironment()); + planCheckerProviderManager.loadPlanCheckerProviders(pluginNodeManager); + startAssociatedProcesses(injector); injector.getInstance(Announcer.class).start(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java index 20495092f9141..37918715db4a7 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java @@ -107,7 +107,15 @@ public void validatePlanFragment(PlanFragment planFragment, Session session, Met checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validateFragment(planFragment, session, metadata, warningCollector)); for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) { for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFragmentPlanCheckers()) { - checker.validateFragment(toSimplePlanFragment(planFragment), warningCollector); + checker.validateFragment(new SimplePlanFragment( + planFragment.getId(), + planFragment.getRoot(), + planFragment.getVariables(), + planFragment.getPartitioning(), + planFragment.getTableScanSchedulingOrder(), + planFragment.getPartitioningScheme(), + planFragment.getStageExecutionDescriptor(), + planFragment.isOutputTableWriterFragment()), warningCollector); } } } @@ -126,17 +134,4 @@ private enum Stage { INTERMEDIATE, FINAL, FRAGMENT } - - private static SimplePlanFragment toSimplePlanFragment(PlanFragment planFragment) - { - return new SimplePlanFragment( - planFragment.getId(), - planFragment.getRoot(), - planFragment.getVariables(), - planFragment.getPartitioning(), - planFragment.getTableScanSchedulingOrder(), - planFragment.getPartitioningScheme(), - planFragment.getStageExecutionDescriptor(), - planFragment.isOutputTableWriterFragment()); - } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java index bd2d0d0bdb20f..1349a1dcd301c 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanCheckerProviderManager.java @@ -14,6 +14,7 @@ package com.facebook.presto.sql.planner.sanity; import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.NodeManager; import com.facebook.presto.spi.plan.PlanCheckerProvider; import com.facebook.presto.spi.plan.PlanCheckerProviderContext; import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; @@ -40,7 +41,7 @@ public class PlanCheckerProviderManager private static final Logger log = Logger.get(PlanCheckerProviderManager.class); private static final String PLAN_CHECKER_PROVIDER_NAME = "plan-checker-provider.name"; - private final PlanCheckerProviderContext planCheckerProviderContext; + private final SimplePlanFragmentSerde simplePlanFragmentSerde; private final Map providerFactories = new ConcurrentHashMap<>(); private final CopyOnWriteArrayList providers = new CopyOnWriteArrayList<>(); private final File configDirectory; @@ -48,7 +49,7 @@ public class PlanCheckerProviderManager @Inject public PlanCheckerProviderManager(SimplePlanFragmentSerde simplePlanFragmentSerde, PlanCheckerProviderManagerConfig config) { - this.planCheckerProviderContext = new PlanCheckerProviderContext(requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null")); + this.simplePlanFragmentSerde = requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null"); requireNonNull(config, "config is null"); this.configDirectory = requireNonNull(config.getPlanCheckerConfigurationDir(), "configDirectory is null"); } @@ -61,9 +62,11 @@ public void addPlanCheckerProviderFactory(PlanCheckerProviderFactory planChecker } } - public void loadPlanCheckerProviders() + public void loadPlanCheckerProviders(NodeManager nodeManager) throws IOException { + PlanCheckerProviderContext planCheckerProviderContext = new PlanCheckerProviderContext(simplePlanFragmentSerde, nodeManager); + for (File file : listFiles(configDirectory)) { if (file.isFile() && file.getName().endsWith(".properties")) { // unlike function namespaces and connectors, we don't have a concept of catalog @@ -75,7 +78,7 @@ public void loadPlanCheckerProviders() file.getAbsoluteFile(), PLAN_CHECKER_PROVIDER_NAME); String planCheckerProviderName = properties.remove(PLAN_CHECKER_PROVIDER_NAME); - log.info("-- Loading plan checker provider %s--", planCheckerProviderName); + log.info("-- Loading plan checker provider [%s] --", planCheckerProviderName); PlanCheckerProviderFactory providerFactory = providerFactories.get(planCheckerProviderName); checkState(providerFactory != null, "No planCheckerProviderFactory found for '%s'. Available factories were %s", planCheckerProviderName, providerFactories.keySet()); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestPlanCheckerProviderManager.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestPlanCheckerProviderManager.java index 7a520f7e4d58d..cd0dfdc15d161 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestPlanCheckerProviderManager.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/sanity/TestPlanCheckerProviderManager.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.plan.SimplePlanFragment; import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde; +import com.facebook.presto.testing.TestingNodeManager; import com.google.common.collect.ImmutableList; import org.testng.annotations.Test; @@ -40,7 +41,7 @@ public void testLoadPlanCheckerProviders() .setPlanCheckerConfigurationDir(new File("src/test/resources/plan-checkers")); PlanCheckerProviderManager planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(JsonCodec.jsonCodec(SimplePlanFragment.class)), planCheckerProviderManagerConfig); planCheckerProviderManager.addPlanCheckerProviderFactory(new TestingPlanCheckerProviderFactory()); - planCheckerProviderManager.loadPlanCheckerProviders(); + planCheckerProviderManager.loadPlanCheckerProviders(new TestingNodeManager()); assertEquals(planCheckerProviderManager.getPlanCheckerProviders(), ImmutableList.of(TESTING_PLAN_CHECKER_PROVIDER)); } @@ -51,7 +52,7 @@ public void testLoadUnregisteredPlanCheckerProvider() PlanCheckerProviderManagerConfig planCheckerProviderManagerConfig = new PlanCheckerProviderManagerConfig() .setPlanCheckerConfigurationDir(new File("src/test/resources/plan-checkers")); PlanCheckerProviderManager planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(JsonCodec.jsonCodec(SimplePlanFragment.class)), planCheckerProviderManagerConfig); - planCheckerProviderManager.loadPlanCheckerProviders(); + planCheckerProviderManager.loadPlanCheckerProviders(new TestingNodeManager()); } public static class TestingPlanCheckerProviderFactory diff --git a/presto-native-sidecar-plugin/pom.xml b/presto-native-sidecar-plugin/pom.xml index 2d68f7b27c117..f3e82500967e5 100644 --- a/presto-native-sidecar-plugin/pom.xml +++ b/presto-native-sidecar-plugin/pom.xml @@ -52,6 +52,11 @@ log-manager + + com.squareup.okhttp3 + okhttp + + com.facebook.presto @@ -77,6 +82,16 @@ provided + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.core + jackson-core + + com.facebook.drift drift-api @@ -95,6 +110,12 @@ provided + + com.google.code.findbugs + jsr305 + true + + com.facebook.presto @@ -182,6 +203,12 @@ annotations test + + + com.squareup.okhttp3 + mockwebserver + test + diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/NativeSidecarPlugin.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/NativeSidecarPlugin.java index 4c11cfa9b34dd..c719f5a2db809 100644 --- a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/NativeSidecarPlugin.java +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/NativeSidecarPlugin.java @@ -13,8 +13,10 @@ */ package com.facebook.presto.sidecar; +import com.facebook.presto.sidecar.nativechecker.NativePlanCheckerProviderFactory; import com.facebook.presto.sidecar.sessionpropertyproviders.NativeSystemSessionPropertyProviderFactory; import com.facebook.presto.spi.CoordinatorPlugin; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.session.WorkerSessionPropertyProviderFactory; import com.google.common.collect.ImmutableList; @@ -26,4 +28,19 @@ public Iterable getWorkerSessionPropertyPr { return ImmutableList.of(new NativeSystemSessionPropertyProviderFactory()); } + + @Override + public Iterable getPlanCheckerProviderFactories() + { + return ImmutableList.of(new NativePlanCheckerProviderFactory(getClassLoader())); + } + + private static ClassLoader getClassLoader() + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = NativeSidecarPlugin.class.getClassLoader(); + } + return classLoader; + } } diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java new file mode 100644 index 0000000000000..c618640cecf59 --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanChecker.java @@ -0,0 +1,154 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.plan.PlanChecker; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanVisitor; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.TableScanNode; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +import java.io.IOException; + +import static com.facebook.presto.sidecar.nativechecker.NativePlanCheckerErrorCode.NATIVEPLANCHECKER_CONNECTION_ERROR; +import static com.facebook.presto.sidecar.nativechecker.NativePlanCheckerErrorCode.NATIVEPLANCHECKER_UNKNOWN_CONVERSION_FAILURE; +import static com.google.common.base.MoreObjects.firstNonNull; +import static java.util.Objects.requireNonNull; + +/** + * Uses the native sidecar to check verify a plan can be run on a native worker. + */ +public final class NativePlanChecker + implements PlanChecker +{ + private static final Logger LOG = Logger.get(NativePlanChecker.class); + private static final MediaType JSON_CONTENT_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final JsonCodec PLAN_CONVERSION_RESPONSE_JSON_CODEC = JsonCodec.jsonCodec(PlanConversionResponse.class); + public static final String PLAN_CONVERSION_ENDPOINT = "/v1/velox/plan"; + + private final NodeManager nodeManager; + private final JsonCodec planFragmentJsonCodec; + private final OkHttpClient httpClient; + + public NativePlanChecker(NodeManager nodeManager, JsonCodec planFragmentJsonCodec) + { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.planFragmentJsonCodec = requireNonNull(planFragmentJsonCodec, "planFragmentJsonCodec is null"); + this.httpClient = new OkHttpClient.Builder().build(); + } + + @Override + public void validate(PlanNode planNode, WarningCollector warningCollector) + { + // NO-OP, only validating fragments + } + + @Override + public void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector) + { + if (!planFragment.getPartitioning().isCoordinatorOnly() && !isInternalSystemConnector(planFragment.getRoot())) { + runValidation(planFragment); + } + else { + LOG.debug("Skipping native plan validation [fragment: %s, root: %s]", planFragment.getId(), planFragment.getRoot().getId()); + } + } + + private boolean isInternalSystemConnector(PlanNode planNode) + { + return planNode.accept(new CheckInternalVisitor(), null); + } + + private void runValidation(SimplePlanFragment planFragment) + { + LOG.debug("Starting native plan validation [fragment: %s, root: %s]", planFragment.getId(), planFragment.getRoot().getId()); + String requestBodyJson = planFragmentJsonCodec.toJson(planFragment); + final Request request = buildRequest(requestBodyJson); + + try (Response response = httpClient.newCall(request).execute()) { + if (!response.isSuccessful()) { + PlanConversionFailureInfo failure = processResponseFailure(response); + String message = String.format("Error from native plan checker: %s", firstNonNull(failure.getMessage(), "Internal error")); + throw new PrestoException(failure::getErrorCode, message, failure.toException()); + } + } + catch (final IOException e) { + throw new PrestoException(NATIVEPLANCHECKER_CONNECTION_ERROR, "I/O error getting native plan checker response", e); + } + finally { + LOG.debug("Native plan validation complete [fragment: %s, root: %s]", planFragment.getId(), planFragment.getRoot().getId()); + } + } + + private Request buildRequest(String requestBodyJson) + { + // Use native sidecar plan conversion endpoint to validate + String planConversionUrl = nodeManager.getSidecarNode().getHttpUri().toString() + PLAN_CONVERSION_ENDPOINT; + + Request.Builder builder = new Request.Builder() + .url(planConversionUrl) + .addHeader("CONTENT_TYPE", "APPLICATION_JSON") + .post(RequestBody.create(JSON_CONTENT_TYPE, requestBodyJson)); + + return builder.build(); + } + + private PlanConversionFailureInfo processResponseFailure(Response response) throws IOException + { + if (response.body() == null) { + throw new PrestoException(NATIVEPLANCHECKER_UNKNOWN_CONVERSION_FAILURE, "Error response without failure from native plan checker with code: " + response.code()); + } + + PlanConversionResponse planConversionResponse = PLAN_CONVERSION_RESPONSE_JSON_CODEC.fromJson(response.body().bytes()); + if (planConversionResponse.getFailures().isEmpty()) { + throw new PrestoException(NATIVEPLANCHECKER_UNKNOWN_CONVERSION_FAILURE, "Error response without failure from native plan checker with code: " + response.code()); + } + + return planConversionResponse.getFailures().get(0); + } + + private static class CheckInternalVisitor + extends PlanVisitor + { + @Override + public Boolean visitTableScan(TableScanNode tableScan, Void context) + { + TableHandle handle = tableScan.getTable(); + return ConnectorId.isInternalSystemConnector(handle.getConnectorId()); + } + + @Override + public Boolean visitPlan(PlanNode node, Void context) + { + for (PlanNode child : node.getSources()) { + if (child.accept(this, context)) { + return true; + } + } + return false; + } + } +} diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java new file mode 100644 index 0000000000000..62d4c47b84f20 --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; + +public class NativePlanCheckerConfig +{ + public static final String CONFIG_PREFIX = "native-plan-checker"; + private boolean enabled; + + public boolean isPlanValidationEnabled() + { + return enabled; + } + + @Config("plan-validation-enabled") + @ConfigDescription("Set true to enable native plan validation") + public NativePlanCheckerConfig setPlanValidationEnabled(boolean enabled) + { + this.enabled = enabled; + return this; + } +} diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerErrorCode.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerErrorCode.java new file mode 100644 index 0000000000000..5562b05ceba02 --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerErrorCode.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.facebook.presto.common.ErrorCode; +import com.facebook.presto.common.ErrorType; +import com.facebook.presto.spi.ErrorCodeSupplier; + +import static com.facebook.presto.common.ErrorType.EXTERNAL; +import static com.facebook.presto.common.ErrorType.INTERNAL_ERROR; + +public enum NativePlanCheckerErrorCode + implements ErrorCodeSupplier +{ + NATIVEPLANCHECKER_CONNECTION_ERROR(0, EXTERNAL), + NATIVEPLANCHECKER_UNKNOWN_CONVERSION_FAILURE(1, INTERNAL_ERROR), + NATIVEPLANCHECKER_RESPONSE_MISSING_BODY(2, INTERNAL_ERROR); + + private final ErrorCode errorCode; + + NativePlanCheckerErrorCode(int code, ErrorType type) + { + errorCode = new ErrorCode(code + 0x0519_0000, name(), type); + } + + @Override + public ErrorCode toErrorCode() + { + return errorCode; + } +} diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProvider.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProvider.java new file mode 100644 index 0000000000000..aeba3c77f4939 --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.sidecar.nativechecker; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.plan.PlanChecker; +import com.facebook.presto.spi.plan.PlanCheckerProvider; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class NativePlanCheckerProvider + implements PlanCheckerProvider +{ + private final NodeManager nodeManager; + private final JsonCodec planFragmentJsonCodec; + private final NativePlanCheckerConfig config; + + @Inject + public NativePlanCheckerProvider(NodeManager nodeManager, JsonCodec planFragmentJsonCodec, NativePlanCheckerConfig config) + { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.planFragmentJsonCodec = requireNonNull(planFragmentJsonCodec, "planFragmentJsonCodec is null"); + this.config = requireNonNull(config, "config is null"); + } + + @Override + public List getFragmentPlanCheckers() + { + return config.isPlanValidationEnabled() ? + ImmutableList.of(new NativePlanChecker(nodeManager, planFragmentJsonCodec)) : + ImmutableList.of(); + } +} diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProviderFactory.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProviderFactory.java new file mode 100644 index 0000000000000..ce41294d40a1d --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/NativePlanCheckerProviderFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.json.JsonModule; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.plan.PlanCheckerProvider; +import com.facebook.presto.spi.plan.PlanCheckerProviderContext; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.google.inject.Injector; +import com.google.inject.Scopes; + +import java.util.Map; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static com.facebook.airlift.json.JsonBinder.jsonBinder; +import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static java.util.Objects.requireNonNull; + +public class NativePlanCheckerProviderFactory + implements PlanCheckerProviderFactory +{ + private final ClassLoader classLoader; + + public NativePlanCheckerProviderFactory(ClassLoader classLoader) + { + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public String getName() + { + return "native"; + } + + @Override + public PlanCheckerProvider create(Map properties, PlanCheckerProviderContext planCheckerProviderContext) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + Bootstrap app = new Bootstrap( + binder -> { + configBinder(binder).bindConfig(NativePlanCheckerConfig.class, NativePlanCheckerConfig.CONFIG_PREFIX); + binder.install(new JsonModule()); + binder.bind(NodeManager.class).toInstance(planCheckerProviderContext.getNodeManager()); + binder.bind(SimplePlanFragmentSerde.class).toInstance(planCheckerProviderContext.getSimplePlanFragmentSerde()); + jsonBinder(binder).addSerializerBinding(SimplePlanFragment.class).to(SimplePlanFragmentSerializer.class).in(Scopes.SINGLETON); + jsonCodecBinder(binder).bindJsonCodec(SimplePlanFragment.class); + binder.bind(PlanCheckerProvider.class).to(NativePlanCheckerProvider.class).in(Scopes.SINGLETON); + }); + + Injector injector = app + .noStrictConfig() + .doNotInitializeLogging() + .setRequiredConfigurationProperties(properties) + .quiet() + .initialize(); + + return injector.getInstance(PlanCheckerProvider.class); + } + } +} diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionFailureInfo.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionFailureInfo.java new file mode 100644 index 0000000000000..a0be3a25cb8b3 --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionFailureInfo.java @@ -0,0 +1,172 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.facebook.presto.common.ErrorCode; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.util.Objects.requireNonNull; + +/** + * This class provides failure information in the response from the native plan checker. + * It is derived from {@link com.facebook.presto.client.FailureInfo}. + */ +@Immutable +public class PlanConversionFailureInfo +{ + private static final Pattern STACK_TRACE_PATTERN = Pattern.compile("(.*)\\.(.*)\\(([^:]*)(?::(.*))?\\)"); + + private final String type; + private final String message; + private final PlanConversionFailureInfo cause; + private final List suppressed; + private final List stack; + private final ErrorCode errorCode; + + @JsonCreator + public PlanConversionFailureInfo( + @JsonProperty("type") String type, + @JsonProperty("message") String message, + @JsonProperty("cause") PlanConversionFailureInfo cause, + @JsonProperty("suppressed") List suppressed, + @JsonProperty("stack") List stack, + @JsonProperty("errorCode") ErrorCode errorCode) + { + requireNonNull(type, "type is null"); + requireNonNull(suppressed, "suppressed is null"); + requireNonNull(stack, "stack is null"); + + this.type = type; + this.message = message; + this.cause = cause; + this.suppressed = ImmutableList.copyOf(suppressed); + this.stack = ImmutableList.copyOf(stack); + this.errorCode = errorCode; + } + + @JsonProperty + public String getType() + { + return type; + } + + @Nullable + @JsonProperty + public String getMessage() + { + return message; + } + + @Nullable + @JsonProperty + public PlanConversionFailureInfo getCause() + { + return cause; + } + + @JsonProperty + public List getSuppressed() + { + return suppressed; + } + + @JsonProperty + public List getStack() + { + return stack; + } + + @Nullable + @JsonProperty + public ErrorCode getErrorCode() + { + return errorCode; + } + + public RuntimeException toException() + { + return toException(this); + } + + private static FailureException toException(PlanConversionFailureInfo failureInfo) + { + if (failureInfo == null) { + return null; + } + FailureException failure = new FailureException(failureInfo.getType(), failureInfo.getMessage(), toException(failureInfo.getCause())); + for (PlanConversionFailureInfo suppressed : failureInfo.getSuppressed()) { + failure.addSuppressed(toException(suppressed)); + } + StackTraceElement[] stackTrace = + failureInfo.getStack().stream().map(PlanConversionFailureInfo::toStackTraceElement).toArray(StackTraceElement[]::new); + failure.setStackTrace(stackTrace); + return failure; + } + + public static StackTraceElement toStackTraceElement(String stack) + { + Matcher matcher = STACK_TRACE_PATTERN.matcher(stack); + if (matcher.matches()) { + String declaringClass = matcher.group(1); + String methodName = matcher.group(2); + String fileName = matcher.group(3); + int number = -1; + if (fileName.equals("Native Method")) { + fileName = null; + number = -2; + } + else if (matcher.group(4) != null) { + number = Integer.parseInt(matcher.group(4)); + } + return new StackTraceElement(declaringClass, methodName, fileName, number); + } + return new StackTraceElement("Unknown", stack, null, -1); + } + + private static class FailureException + extends RuntimeException + { + private final String type; + + FailureException(String type, String message, FailureException cause) + { + super(message, cause); + this.type = requireNonNull(type, "type is null"); + } + + public String getType() + { + return type; + } + + @Override + public String toString() + { + String message = getMessage(); + if (message != null) { + return type + ": " + message; + } + return type; + } + } +} diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionResponse.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionResponse.java new file mode 100644 index 0000000000000..4a0068d2cf05f --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/PlanConversionResponse.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class PlanConversionResponse +{ + private final List failures; + + @JsonCreator + public PlanConversionResponse( + @JsonProperty("failures") List failures) + { + this.failures = ImmutableList.copyOf(requireNonNull(failures, "failures is null")); + } + + @JsonProperty + public List getFailures() + { + return failures; + } +} diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/SimplePlanFragmentSerializer.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/SimplePlanFragmentSerializer.java new file mode 100644 index 0000000000000..ef1c5967d84fb --- /dev/null +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/nativechecker/SimplePlanFragmentSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar.nativechecker; + +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.SimplePlanFragmentSerde; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.jsontype.TypeSerializer; +import com.google.inject.Inject; + +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +public class SimplePlanFragmentSerializer + extends JsonSerializer +{ + private final SimplePlanFragmentSerde simplePlanFragmentSerde; + + @Inject + public SimplePlanFragmentSerializer(SimplePlanFragmentSerde simplePlanFragmentSerde) + { + this.simplePlanFragmentSerde = requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null"); + } + + @Override + public void serialize(SimplePlanFragment planNode, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + throws IOException + { + jsonGenerator.writeRawValue(simplePlanFragmentSerde.serialize(planNode)); + } + + @Override + public void serializeWithType(SimplePlanFragment planNode, JsonGenerator jsonGenerator, SerializerProvider serializerProvider, TypeSerializer typeSerializer) + throws IOException + { + serialize(planNode, jsonGenerator, serializerProvider); + } +} diff --git a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java new file mode 100644 index 0000000000000..04744bb0d3922 --- /dev/null +++ b/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestPlanCheckerProvider.java @@ -0,0 +1,265 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sidecar; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.ErrorCode; +import com.facebook.presto.sidecar.nativechecker.NativePlanChecker; +import com.facebook.presto.sidecar.nativechecker.NativePlanCheckerConfig; +import com.facebook.presto.sidecar.nativechecker.NativePlanCheckerProvider; +import com.facebook.presto.sidecar.nativechecker.PlanConversionFailureInfo; +import com.facebook.presto.sidecar.nativechecker.PlanConversionResponse; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.Node; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.NodePoolType; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.StandardErrorCode; +import com.facebook.presto.spi.connector.ConnectorPartitioningHandle; +import com.facebook.presto.spi.plan.Partitioning; +import com.facebook.presto.spi.plan.PartitioningHandle; +import com.facebook.presto.spi.plan.PartitioningScheme; +import com.facebook.presto.spi.plan.PlanFragmentId; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeId; +import com.facebook.presto.spi.plan.SimplePlanFragment; +import com.facebook.presto.spi.plan.StageExecutionDescriptor; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + +public class TestPlanCheckerProvider +{ + private static final JsonCodec PLAN_FRAGMENT_JSON_CODEC = JsonCodec.jsonCodec(SimplePlanFragment.class); + private static final JsonCodec PLAN_CONVERSION_RESPONSE_JSON_CODEC = JsonCodec.jsonCodec(PlanConversionResponse.class); + + @Test + public void testGetPlanChecker() + { + NativePlanCheckerConfig config = new NativePlanCheckerConfig(); + assertFalse(config.isPlanValidationEnabled()); + NativePlanCheckerProvider provider = new NativePlanCheckerProvider(new TestingNodeManager(URI.create("localhost")), PLAN_FRAGMENT_JSON_CODEC, config); + assertTrue(provider.getIntermediatePlanCheckers().isEmpty()); + assertTrue(provider.getFinalPlanCheckers().isEmpty()); + assertTrue(provider.getFragmentPlanCheckers().isEmpty()); + + // Enable the native plan checker, should appear in fragment plan checkers + config.setPlanValidationEnabled(true); + assertEquals(provider.getFragmentPlanCheckers().size(), 1); + } + + @Test + public void testNativePlanMockValidate() + throws IOException + { + TestingPlanNode root = new TestingPlanNode(); + ConnectorPartitioningHandle connectorPartitioningHandle = new TestingConnectorPartitioningHandle(); + PartitioningHandle handle = new PartitioningHandle(Optional.empty(), Optional.empty(), connectorPartitioningHandle); + PartitioningScheme partitioningScheme = new PartitioningScheme(new Partitioning(handle, ImmutableList.of()), ImmutableList.of()); + SimplePlanFragment fragment = new SimplePlanFragment(new PlanFragmentId(1), root, ImmutableSet.of(), handle, ImmutableList.of(), partitioningScheme, StageExecutionDescriptor.ungroupedExecution(), false); + + try (MockWebServer server = new MockWebServer()) { + server.start(); + TestingNodeManager nodeManager = new TestingNodeManager(server.url(NativePlanChecker.PLAN_CONVERSION_ENDPOINT).uri()); + NativePlanChecker checker = new NativePlanChecker(nodeManager, PLAN_FRAGMENT_JSON_CODEC); + + PlanConversionResponse responseOk = new PlanConversionResponse(ImmutableList.of()); + String responseOkString = PLAN_CONVERSION_RESPONSE_JSON_CODEC.toJson(responseOk); + server.enqueue(new MockResponse().setBody(responseOkString)); + checker.validateFragment(fragment, null); + + String errorMessage = "native conversion error"; + ErrorCode errorCode = StandardErrorCode.NOT_SUPPORTED.toErrorCode(); + PlanConversionResponse responseError = new PlanConversionResponse(ImmutableList.of(new PlanConversionFailureInfo("MockError", errorMessage, null, ImmutableList.of(), ImmutableList.of(), errorCode))); + String responseErrorString = PLAN_CONVERSION_RESPONSE_JSON_CODEC.toJson(responseError); + server.enqueue(new MockResponse().setResponseCode(500).setBody(responseErrorString)); + PrestoException error = expectThrows(PrestoException.class, + () -> checker.validateFragment(fragment, null)); + assertEquals(error.getErrorCode(), errorCode); + assertTrue(error.getMessage().contains(errorMessage)); + } + } + + public static class TestingConnectorPartitioningHandle + implements ConnectorPartitioningHandle + { + @JsonProperty + @Override + public boolean isCoordinatorOnly() + { + return false; + } + } + + private static class TestingPlanNode + extends PlanNode + { + protected TestingPlanNode() + { + super(Optional.empty(), new PlanNodeId("1"), Optional.empty()); + } + + @Override + public List getSources() + { + return ImmutableList.of(); + } + + @Override + public List getOutputVariables() + { + return ImmutableList.of(); + } + + @Override + public PlanNode replaceChildren(List newChildren) + { + return this; + } + + @Override + public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalentPlanNode) + { + return this; + } + } + + private static class TestingNodeManager + implements NodeManager + { + private final TestSidecarNode sidecarNode; + + public TestingNodeManager(URI sidecarUri) + { + this.sidecarNode = new TestSidecarNode(sidecarUri); + } + + @Override + public Set getAllNodes() + { + return ImmutableSet.of(sidecarNode); + } + + @Override + public Set getWorkerNodes() + { + return Collections.emptySet(); + } + + @Override + public Node getCurrentNode() + { + return null; + } + + @Override + public Node getSidecarNode() + { + return sidecarNode; + } + + @Override + public String getEnvironment() + { + return null; + } + } + + private static class TestSidecarNode + implements Node + { + private final URI sidecarUri; + + public TestSidecarNode(URI sidecarUri) + { + this.sidecarUri = sidecarUri; + } + + @Override + public String getHost() + { + return sidecarUri.getHost(); + } + + @Override + public HostAddress getHostAndPort() + { + return HostAddress.fromUri(sidecarUri); + } + + @Override + public URI getHttpUri() + { + return sidecarUri; + } + + @Override + public String getNodeIdentifier() + { + return "ABC"; + } + + @Override + public String getVersion() + { + return "1"; + } + + @Override + public boolean isCoordinator() + { + return false; + } + + @Override + public boolean isResourceManager() + { + return false; + } + + @Override + public boolean isCatalogServer() + { + return false; + } + + @Override + public boolean isCoordinatorSidecar() + { + return true; + } + + @Override + public NodePoolType getPoolType() + { + return NodePoolType.DEFAULT; + } + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java index 81e57f7e12359..d161738b67bc9 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java @@ -14,16 +14,15 @@ package com.facebook.presto.spi; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; +import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.session.WorkerSessionPropertyProviderFactory; import static java.util.Collections.emptyList; /** - * Introduces a new {@link CoordinatorPlugin} interface for native C++ clusters. - * Certain elements of the {@link Plugin} SPI are not used in Prestissimo deployments, or may result in inconsistencies. - * The {@link CoordinatorPlugin} includes only the interfaces relevant to native C++ clusters and - * is a successor to {@link Plugin} and will eventually replace it. - * It contains only interfaces that are valid and used in a coordinator. + * The {@link CoordinatorPlugin} interface allows for plugins to provide additional functionality + * specifically for a coordinator in a Presto cluster. This is a successor to {@link Plugin} for + * coordinator specific plugins and will eventually replace it. */ public interface CoordinatorPlugin { @@ -36,4 +35,9 @@ default Iterable getWorkerSessionPropertyP { return emptyList(); } + + default Iterable getPlanCheckerProviderFactories() + { + return emptyList(); + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java index aa976b80cac5b..81e8f55b0a665 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java @@ -22,7 +22,6 @@ import com.facebook.presto.spi.eventlistener.EventListenerFactory; import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; import com.facebook.presto.spi.nodestatus.NodeStatusNotificationProviderFactory; -import com.facebook.presto.spi.plan.PlanCheckerProviderFactory; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory; import com.facebook.presto.spi.security.PasswordAuthenticatorFactory; @@ -143,9 +142,4 @@ default Iterable getNodeStatusNotificatio { return emptyList(); } - - default Iterable getPlanCheckerProviderFactories() - { - return emptyList(); - } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderContext.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderContext.java index e6218245adbde..7c0d9b3181757 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderContext.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PlanCheckerProviderContext.java @@ -14,19 +14,28 @@ package com.facebook.presto.spi.plan; +import com.facebook.presto.spi.NodeManager; + import static java.util.Objects.requireNonNull; public class PlanCheckerProviderContext { private final SimplePlanFragmentSerde simplePlanFragmentSerde; + private final NodeManager nodeManager; - public PlanCheckerProviderContext(SimplePlanFragmentSerde simplePlanFragmentSerde) + public PlanCheckerProviderContext(SimplePlanFragmentSerde simplePlanFragmentSerde, NodeManager nodeManager) { this.simplePlanFragmentSerde = requireNonNull(simplePlanFragmentSerde, "simplePlanFragmentSerde is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); } public SimplePlanFragmentSerde getSimplePlanFragmentSerde() { return simplePlanFragmentSerde; } + + public NodeManager getNodeManager() + { + return nodeManager; + } } diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java index a634c8eb61daa..4bf0a681e79a2 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestQueues.java @@ -27,12 +27,15 @@ import com.facebook.presto.spi.resourceGroups.ResourceGroupId; import com.facebook.presto.spi.session.ResourceEstimates; import com.facebook.presto.sql.Serialization; +import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager; +import com.facebook.presto.testing.TestingNodeManager; import com.facebook.presto.tests.DistributedQueryRunner; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.inject.Key; import io.airlift.units.DataSize; import io.airlift.units.Duration; import org.testng.annotations.AfterMethod; @@ -388,8 +391,9 @@ public void testEagerPlanValidation() AtomicBoolean triggerValidationFailure = new AtomicBoolean(); queryRunner.installPlugin(new ResourceGroupManagerPlugin()); - queryRunner.installPlugin(new TestingPlanCheckerProviderPlugin(triggerValidationFailure)); - queryRunner.getPlanCheckerProviderManager().loadPlanCheckerProviders(); + queryRunner.installCoordinatorPlugin(new TestingPlanCheckerProviderPlugin(triggerValidationFailure)); + PlanCheckerProviderManager planCheckerProviderManager = queryRunner.getCoordinator().getInstance(Key.get(PlanCheckerProviderManager.class)); + planCheckerProviderManager.loadPlanCheckerProviders(new TestingNodeManager()); queryRunner.getCoordinator().getResourceGroupManager().get().forceSetConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_eager_plan_validation.json"))); Session.SessionBuilder builder = testSessionBuilder() diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/TestingPlanCheckerProviderPlugin.java b/presto-tests/src/test/java/com/facebook/presto/execution/TestingPlanCheckerProviderPlugin.java index 75711b41f80e2..3799b8f6e8e7b 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/TestingPlanCheckerProviderPlugin.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/TestingPlanCheckerProviderPlugin.java @@ -13,7 +13,7 @@ */ package com.facebook.presto.execution; -import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.CoordinatorPlugin; import com.facebook.presto.spi.plan.PlanChecker; import com.facebook.presto.spi.plan.PlanCheckerProvider; import com.facebook.presto.spi.plan.PlanCheckerProviderContext; @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class TestingPlanCheckerProviderPlugin - implements Plugin, PlanCheckerProviderFactory, PlanCheckerProvider + implements CoordinatorPlugin, PlanCheckerProviderFactory, PlanCheckerProvider { private final AtomicBoolean triggerValidationFailure;