Skip to content

Commit

Permalink
Add native plan checker to sidecar plugin
Browse files Browse the repository at this point in the history
This adds a provider for a native plan checker that will send a plan fragment
to the native sidecar where it is validated by performing a conversion to a
Velox plan. If the conversion succeeds the query will continue, if it fails
then the query will fail with an error from the native sidecar. The provider
is added to the native sidecar plugin and is enabled with the config
`native-plan-checker.plan-validation-enabled=true` from filename
`etc/plan-checker-providers/native-plan-checker.properties`.

See also: #23649
RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0008-plan-checker.md
  • Loading branch information
BryanCutler committed Nov 15, 2024
1 parent 93526d3 commit 49c9b7e
Show file tree
Hide file tree
Showing 22 changed files with 994 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public Set<Node> getAllNodes()
return ImmutableSet.<Node>builder()
.addAll(getWorkerNodes())
.addAll(nodeManager.getCoordinators())
.addAll(nodeManager.getCoordinatorSidecars())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,6 @@ public void installPlugin(Plugin plugin)
log.info("Registering node status notification provider %s", nodeStatusNotificationProviderFactory.getName());
nodeStatusNotificationManager.addNodeStatusNotificationProviderFactory(nodeStatusNotificationProviderFactory);
}

for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) {
log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName());
planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory);
}
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
Expand All @@ -372,6 +367,11 @@ public void installCoordinatorPlugin(CoordinatorPlugin plugin)
log.info("Registering system session property provider factory %s", providerFactory.getName());
metadata.getSessionPropertyManager().addSessionPropertyProviderFactory(providerFactory);
}

for (PlanCheckerProviderFactory planCheckerProviderFactory : plugin.getPlanCheckerProviderFactories()) {
log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName());
planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory);
}
}

private URLClassLoader buildClassLoader(String plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.airlift.json.smile.SmileModule;
import com.facebook.airlift.log.LogJmxModule;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.airlift.node.NodeModule;
import com.facebook.airlift.tracetoken.TraceTokenModule;
import com.facebook.drift.server.DriftServer;
Expand All @@ -39,9 +40,11 @@
import com.facebook.presto.execution.warnings.WarningCollectorModule;
import com.facebook.presto.metadata.Catalog;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.metadata.DiscoveryNodeManager;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.StaticCatalogStore;
import com.facebook.presto.metadata.StaticFunctionNamespaceStore;
import com.facebook.presto.nodeManager.PluginNodeManager;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.security.AccessControlModule;
import com.facebook.presto.server.security.PasswordAuthenticatorManager;
Expand Down Expand Up @@ -179,8 +182,12 @@ public void run()
injector.getInstance(TracerProviderManager.class).loadTracerProvider();
injector.getInstance(NodeStatusNotificationManager.class).loadNodeStatusNotificationProvider();
injector.getInstance(GracefulShutdownHandler.class).loadNodeStatusNotification();
injector.getInstance(PlanCheckerProviderManager.class).loadPlanCheckerProviders();
injector.getInstance(SessionPropertyManager.class).loadSessionPropertyProviders();
PlanCheckerProviderManager planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class);
InternalNodeManager nodeManager = injector.getInstance(DiscoveryNodeManager.class);
NodeInfo nodeInfo = injector.getInstance(NodeInfo.class);
PluginNodeManager pluginNodeManager = new PluginNodeManager(nodeManager, nodeInfo.getEnvironment());
planCheckerProviderManager.loadPlanCheckerProviders(pluginNodeManager);

startAssociatedProcesses(injector);

injector.getInstance(Announcer.class).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,15 @@ public void validatePlanFragment(PlanFragment planFragment, Session session, Met
checkers.get(Stage.FRAGMENT).forEach(checker -> checker.validateFragment(planFragment, session, metadata, warningCollector));
for (PlanCheckerProvider provider : planCheckerProviderManager.getPlanCheckerProviders()) {
for (com.facebook.presto.spi.plan.PlanChecker checker : provider.getFragmentPlanCheckers()) {
checker.validateFragment(toSimplePlanFragment(planFragment), warningCollector);
checker.validateFragment(new SimplePlanFragment(
planFragment.getId(),
planFragment.getRoot(),
planFragment.getVariables(),
planFragment.getPartitioning(),
planFragment.getTableScanSchedulingOrder(),
planFragment.getPartitioningScheme(),
planFragment.getStageExecutionDescriptor(),
planFragment.isOutputTableWriterFragment()), warningCollector);
}
}
}
Expand All @@ -126,17 +134,4 @@ private enum Stage
{
INTERMEDIATE, FINAL, FRAGMENT
}

private static SimplePlanFragment toSimplePlanFragment(PlanFragment planFragment)
{
return new SimplePlanFragment(
planFragment.getId(),
planFragment.getRoot(),
planFragment.getVariables(),
planFragment.getPartitioning(),
planFragment.getTableScanSchedulingOrder(),
planFragment.getPartitioningScheme(),
planFragment.getStageExecutionDescriptor(),
planFragment.isOutputTableWriterFragment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.sql.planner.sanity;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.plan.PlanCheckerProvider;
import com.facebook.presto.spi.plan.PlanCheckerProviderContext;
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
Expand All @@ -40,15 +41,15 @@ public class PlanCheckerProviderManager
private static final Logger log = Logger.get(PlanCheckerProviderManager.class);
private static final String PLAN_CHECKER_PROVIDER_NAME = "plan-checker-provider.name";

private final PlanCheckerProviderContext planCheckerProviderContext;
private final SimplePlanFragmentSerde simplePlanFragmentSerde;
private final Map<String, PlanCheckerProviderFactory> providerFactories = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<PlanCheckerProvider> providers = new CopyOnWriteArrayList<>();
private final File configDirectory;

@Inject
public PlanCheckerProviderManager(SimplePlanFragmentSerde simplePlanFragmentSerde, PlanCheckerProviderManagerConfig config)
{
this.planCheckerProviderContext = new PlanCheckerProviderContext(requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null"));
this.simplePlanFragmentSerde = requireNonNull(simplePlanFragmentSerde, "planNodeSerde is null");
requireNonNull(config, "config is null");
this.configDirectory = requireNonNull(config.getPlanCheckerConfigurationDir(), "configDirectory is null");
}
Expand All @@ -61,9 +62,11 @@ public void addPlanCheckerProviderFactory(PlanCheckerProviderFactory planChecker
}
}

public void loadPlanCheckerProviders()
public void loadPlanCheckerProviders(NodeManager nodeManager)
throws IOException
{
PlanCheckerProviderContext planCheckerProviderContext = new PlanCheckerProviderContext(simplePlanFragmentSerde, nodeManager);

for (File file : listFiles(configDirectory)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
// unlike function namespaces and connectors, we don't have a concept of catalog
Expand All @@ -75,7 +78,7 @@ public void loadPlanCheckerProviders()
file.getAbsoluteFile(),
PLAN_CHECKER_PROVIDER_NAME);
String planCheckerProviderName = properties.remove(PLAN_CHECKER_PROVIDER_NAME);
log.info("-- Loading plan checker provider %s--", planCheckerProviderName);
log.info("-- Loading plan checker provider [%s] --", planCheckerProviderName);
PlanCheckerProviderFactory providerFactory = providerFactories.get(planCheckerProviderName);
checkState(providerFactory != null,
"No planCheckerProviderFactory found for '%s'. Available factories were %s", planCheckerProviderName, providerFactories.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
import com.facebook.presto.spi.plan.SimplePlanFragment;
import com.facebook.presto.sql.planner.plan.JsonCodecSimplePlanFragmentSerde;
import com.facebook.presto.testing.TestingNodeManager;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;

Expand All @@ -40,7 +41,7 @@ public void testLoadPlanCheckerProviders()
.setPlanCheckerConfigurationDir(new File("src/test/resources/plan-checkers"));
PlanCheckerProviderManager planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(JsonCodec.jsonCodec(SimplePlanFragment.class)), planCheckerProviderManagerConfig);
planCheckerProviderManager.addPlanCheckerProviderFactory(new TestingPlanCheckerProviderFactory());
planCheckerProviderManager.loadPlanCheckerProviders();
planCheckerProviderManager.loadPlanCheckerProviders(new TestingNodeManager());
assertEquals(planCheckerProviderManager.getPlanCheckerProviders(), ImmutableList.of(TESTING_PLAN_CHECKER_PROVIDER));
}

Expand All @@ -51,7 +52,7 @@ public void testLoadUnregisteredPlanCheckerProvider()
PlanCheckerProviderManagerConfig planCheckerProviderManagerConfig = new PlanCheckerProviderManagerConfig()
.setPlanCheckerConfigurationDir(new File("src/test/resources/plan-checkers"));
PlanCheckerProviderManager planCheckerProviderManager = new PlanCheckerProviderManager(new JsonCodecSimplePlanFragmentSerde(JsonCodec.jsonCodec(SimplePlanFragment.class)), planCheckerProviderManagerConfig);
planCheckerProviderManager.loadPlanCheckerProviders();
planCheckerProviderManager.loadPlanCheckerProviders(new TestingNodeManager());
}

public static class TestingPlanCheckerProviderFactory
Expand Down
27 changes: 27 additions & 0 deletions presto-native-sidecar-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<artifactId>log-manager</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand All @@ -77,6 +82,16 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-api</artifactId>
Expand All @@ -95,6 +110,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<optional>true</optional>
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand Down Expand Up @@ -182,6 +203,12 @@
<artifactId>annotations</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package com.facebook.presto.sidecar;

import com.facebook.presto.sidecar.nativechecker.NativePlanCheckerProviderFactory;
import com.facebook.presto.sidecar.sessionpropertyproviders.NativeSystemSessionPropertyProviderFactory;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.plan.PlanCheckerProviderFactory;
import com.facebook.presto.spi.session.WorkerSessionPropertyProviderFactory;
import com.google.common.collect.ImmutableList;

Expand All @@ -26,4 +28,19 @@ public Iterable<WorkerSessionPropertyProviderFactory> getWorkerSessionPropertyPr
{
return ImmutableList.of(new NativeSystemSessionPropertyProviderFactory());
}

@Override
public Iterable<PlanCheckerProviderFactory> getPlanCheckerProviderFactories()
{
return ImmutableList.of(new NativePlanCheckerProviderFactory(getClassLoader()));
}

private static ClassLoader getClassLoader()
{
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = NativeSidecarPlugin.class.getClassLoader();
}
return classLoader;
}
}
Loading

0 comments on commit 49c9b7e

Please sign in to comment.