Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New spline.pluginsEnabledByDefault property #759

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,13 @@ When one of these commands occurs spline will let you know by logging a warning.
### Plugin API

Using a plugin API you can capture lineage from a 3rd party data source provider.
Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin.
By default, Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin.
All you need is to create a class extending the `za.co.absa.spline.harvester.plugin.Plugin` marker trait
mixed with one or more `*Processing` traits, depending on your intention.

To disable automatic plugin discovery and speed up initialization, set `spline.pluginsEnabledByDefault` to `false` in your configuration file.
Then, you will need to register all necessary plugins one by one, using `spline.plugins` configuration property.

There are three general processing traits:

- `DataSourceFormatNameResolving` - returns a name of a data provider/format in use.
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/spline.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,6 @@ spline:
- collect
- collectAsList
- toLocalIterator

# Should plugins be enabled by default.
pluginsEnabledByDefault: true # true | false
10 changes: 7 additions & 3 deletions core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.spline.harvester.IdGenerator.UUIDVersion
import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode}
import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher}
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter}

import scala.collection.JavaConverters._
Expand All @@ -37,7 +38,7 @@ private[spline] trait AgentBOM {
def lineageDispatcher: LineageDispatcher
def iwdStrategy: IgnoredWriteDetectionStrategy
def execPlanUUIDVersion: UUIDVersion
def pluginsConfig: Configuration
def pluginsConfig: PluginsConfiguration
}

object AgentBOM {
Expand All @@ -62,8 +63,11 @@ object AgentBOM {
mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion)
}

override def pluginsConfig: Configuration = {
mergedConfig.subset(ConfProperty.PluginsConfigNamespace)
override def pluginsConfig: PluginsConfiguration = {
PluginsConfiguration(
mergedConfig.getRequiredBoolean(ConfProperty.PluginsEnabledByDefault),
mergedConfig.subset(ConfProperty.PluginsConfigNamespace)
)
}

override lazy val postProcessingFilter: Option[PostProcessingFilter] = {
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ object AgentConfig {
this
}

def pluginsEnabledByDefault(enabled: Boolean): this.type = synchronized {
options += ConfProperty.PluginsEnabledByDefault -> enabled
this
}

def enablePlugin(name: String): this.type = synchronized {
options += s"${ConfProperty.PluginsConfigNamespace}.$name.enabled" -> true
this
}

def build(): AgentConfig = new AgentConfig {
options.foreach(tupled(addProperty))
}
Expand Down Expand Up @@ -123,6 +133,11 @@ object AgentConfig {
*/
val IgnoreWriteDetectionStrategy = "spline.IWDStrategy"

/**
* Should plugins be enabled by default
*/
val PluginsEnabledByDefault = "spline.pluginsEnabledByDefault"

val PluginsConfigNamespace = "spline.plugins"

def dispatcherClassName(logicalName: String): String = s"$RootLineageDispatcher.$logicalName.${HierarchicalObjectFactory.ClassName}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor
import za.co.absa.spline.harvester.converter.{DataConverter, DataTypeConverter}
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry
import za.co.absa.spline.harvester.postprocessing._
import za.co.absa.spline.harvester.qualifier.HDFSPathQualifier
Expand All @@ -54,7 +55,7 @@ object SplineAgent extends Logging {
)

def create(
pluginsConfig: Configuration,
pluginsConfig: PluginsConfiguration,
session: SparkSession,
lineageDispatcher: LineageDispatcher,
userPostProcessingFilter: Option[PostProcessingFilter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class PluggableDataSourceFormatResolver(pluginRegistry: PluginRegistry) extends
private val processFn =
pluginRegistry.plugins[DataSourceFormatNameResolving]
.map(_.formatNameResolver)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)
.orElse[AnyRef, String] {
case dsr: DataSourceRegister => dsr.shortName
case o => o.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ class PluggableReadCommandExtractor(
private val planProcessFn =
pluginRegistry.plugins[ReadNodeProcessing]
.map(_.readNodeProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

private val rddProcessFn =
pluginRegistry.plugins[RddReadNodeProcessing]
.map(_.rddReadNodeProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

override def asReadCommand(planOrRdd: PlanOrRdd): Option[ReadCommand] = {
val res = planOrRdd match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class PluggableWriteCommandExtractor(
private val processFn: ((FuncName, LogicalPlan)) => Option[WriteNodeInfo] =
pluginRegistry.plugins[WriteNodeProcessing]
.map(_.writeNodeProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)
.lift

def asWriteCommand(funcName: FuncName, logicalPlan: LogicalPlan): Option[WriteCommand] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.harvester.plugin

import org.apache.commons.configuration.Configuration

case class PluginsConfiguration(
pluginsEnabledByDefault: Boolean,
plugins: Configuration
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class LogicalRelationPlugin(pluginRegistry: PluginRegistry) extends Plugin with
private lazy val baseRelProcessor =
pluginRegistry.plugins[BaseRelationProcessing]
.map(_.baseRelationProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

override val readNodeProcessor: PartialFunction[LogicalPlan, ReadNodeInfo] = {
case lr: LogicalRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class SaveIntoDataSourceCommandPlugin(
private lazy val rpProcessor =
pluginRegistry.plugins[RelationProviderProcessing]
.map(_.relationProviderProcessor)
.reduce(_ orElse _)

.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

override def writeNodeProcessor: PartialFunction[(FuncName, LogicalPlan), WriteNodeInfo] = {
case (_, cmd: SaveIntoDataSourceCommand) => cmd match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.spark.internal.Logging
import za.co.absa.spline.commons.lang.ARM
import za.co.absa.spline.harvester.plugin.Plugin
import za.co.absa.spline.harvester.plugin.Plugin.Precedence
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.{EnabledByDefault, EnabledConfProperty, PluginClasses, getOnlyOrThrow}
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.{EnabledConfProperty, PluginClasses, getOnlyOrThrow}

import javax.annotation.Priority
import scala.collection.JavaConverters._
Expand All @@ -32,7 +33,7 @@ import scala.util.Try
import scala.util.control.NonFatal

class AutoDiscoveryPluginRegistry(
conf: Configuration,
conf: PluginsConfiguration,
injectables: AnyRef*
) extends PluginRegistry
with Logging {
Expand All @@ -47,13 +48,29 @@ class AutoDiscoveryPluginRegistry(
typedInjectables.groupBy(_._1).mapValues(_.map(_._2))
}

private val allPlugins: Seq[Plugin] =
for (pc <- PluginClasses if isPluginEnabled(pc)) yield {
private val allPlugins: Seq[Plugin] = {
val enabledPlugins = if(conf.pluginsEnabledByDefault) {
PluginClasses.filter(pc => isPluginEnabled(pc.getName))
} else {
conf.plugins.getKeys.asScala
.filter(_.endsWith(s".$EnabledConfProperty")) // Looking for keys ending with ".enabled", since plugins must be explicitly enabled
.map(_.dropRight(EnabledConfProperty.length + 1)) // Dropping ".enabled" to get plugin class name
.filter(isPluginEnabled)
.map(Class.forName)
.toSeq
}

if(enabledPlugins.isEmpty) {
throw new RuntimeException("No plugins enabled")
}

for (pc <- enabledPlugins) yield {
logInfo(s"Loading plugin: $pc")
instantiatePlugin(pc)
.recover({ case NonFatal(e) => throw new RuntimeException(s"Plugin instantiation failure: $pc", e) })
.get
}
}

override def plugins[A: ClassTag]: Seq[Plugin with A] = {
val ct = implicitly[ClassTag[A]]
Expand All @@ -65,19 +82,19 @@ class AutoDiscoveryPluginRegistry(
val constr = getOnlyOrThrow(constrs, s"Plugin class must have a single public constructor: ${constrs.mkString(", ")}")
val args = constr.getParameterTypes.map {
case ct if classOf[Configuration].isAssignableFrom(ct) =>
conf.subset(pluginClass.getName)
conf.plugins.subset(pluginClass.getName)
case pt =>
val candidates = injectablesByType.getOrElse(pt, sys.error(s"Cannot bind $pt. No value found"))
getOnlyOrThrow(candidates, s"Ambiguous constructor parameter binding. Multiple values found for $pt: ${candidates.length}")
}
constr.newInstance(args: _*).asInstanceOf[Plugin]
}

private def isPluginEnabled(pc: Class[Plugin]): Boolean = {
val pluginConf = conf.subset(pc.getName)
val isEnabled = pluginConf.getBoolean(EnabledConfProperty, EnabledByDefault)
private def isPluginEnabled(pcn: String): Boolean = {
val pluginConf = conf.plugins.subset(pcn)
val isEnabled = pluginConf.getBoolean(EnabledConfProperty, conf.pluginsEnabledByDefault)
if (!isEnabled) {
logWarning(s"Plugin ${pc.getName} is disabled in the configuration.")
logWarning(s"Plugin ${pcn} is disabled in the configuration.")
}
isEnabled
}
Expand All @@ -87,7 +104,6 @@ class AutoDiscoveryPluginRegistry(
object AutoDiscoveryPluginRegistry extends Logging {

private val EnabledConfProperty = "enabled"
private val EnabledByDefault = true

private val PluginClasses: Seq[Class[Plugin]] = {
logDebug("Scanning for plugins")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ class AgentConfigSpec
.lineageDispatcher(mockDispatcher)
.postProcessingFilter(mockFilter)
.ignoredWriteDetectionStrategy(mockIwdStrategy)
.pluginsEnabledByDefault(false)
.build()

config should not be empty
config.getKeys.asScala should have length 3
config.getKeys.asScala should have length 4
config.getProperty(ConfProperty.RootLineageDispatcher) should be theSameInstanceAs mockDispatcher
config.getProperty(ConfProperty.RootPostProcessingFilter) should be theSameInstanceAs mockFilter
config.getProperty(ConfProperty.IgnoreWriteDetectionStrategy) should be theSameInstanceAs mockIwdStrategy
config.getProperty(ConfProperty.PluginsEnabledByDefault) shouldBe false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package za.co.absa.spline.test.fixture.spline

import org.apache.spark.sql.SparkSession
import za.co.absa.spline.agent.AgentConfig


trait SplineFixture {
def withLineageTracking[T](testBody: LineageCaptor => T)(implicit session: SparkSession): T = {
testBody(new LineageCaptor)
def withLineageTracking[T](testBody: LineageCaptor => T, builderCustomizer: AgentConfig.Builder => AgentConfig.Builder = identity)(implicit session: SparkSession): T = {
testBody(new LineageCaptor(builderCustomizer))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BasicIntegrationTests extends AsyncFlatSpec

"saveAsTable" should "process all operations" in
withNewSparkSession(implicit spark =>
withLineageTracking { captor =>
withLineageTracking({ captor =>
import spark.implicits._

withNewSparkSession {
Expand All @@ -57,7 +57,11 @@ class BasicIntegrationTests extends AsyncFlatSpec
plan.operations.other should have length 2
plan.operations.write should not be null
}
}
},{
// To enable the SQL plugin only
_.pluginsEnabledByDefault(false)
.enablePlugin("za.co.absa.spline.harvester.plugin.embedded.SQLPlugin")
})
)

"save_to_fs" should "process all operations" in
Expand Down