From 4ac241e237b479c2fcae3d1dd580603ab5d36ccf Mon Sep 17 00:00:00 2001 From: Sidharth Bolar Date: Sun, 21 Aug 2022 20:06:41 +1000 Subject: [PATCH 1/2] external rules loaded as json with UT --- .gitignore | 3 + README.md | 12 ++ build.sbt | 9 + .../labs/validation/RuleParser.scala | 197 ++++++++++++++++++ .../labs/validation/utils/Structures.scala | 13 +- src/test/resources/rules.json | 53 +++++ .../labs/validation/RuleParserTestSuite.scala | 102 +++++++++ .../labs/validation/RuleTestSuite.scala | 2 +- 8 files changed, 387 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/com/databricks/labs/validation/RuleParser.scala create mode 100644 src/test/resources/rules.json create mode 100644 src/test/scala/com/databricks/labs/validation/RuleParserTestSuite.scala diff --git a/.gitignore b/.gitignore index 7f7ecc2..e0137e9 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ *.iml #local spark context data from unit tests spark-warehouse/ +#scala worksheets +*.sc #Build dirctory for maven/sbt target/ @@ -11,3 +13,4 @@ project/target/ /target/ /project/build.properties /src/main/scala/com/databricks/labs/validation/LocalTest.scala +.bsp/sbt.json diff --git a/README.md b/README.md index 9a04923..0a4fcb0 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,18 @@ val specializedRules = Array( ) RuleSet(df, by = "store").add(specializedRules) ``` + +## List of Rules as JSON +An array of list of rules can be initialised from an external file containing valid json +Pass the Json as String as following +```scala +val jsonParserInstance = new JsonRuleParser() +val rulesArray = jsonParserInstance.parseRules(jsonString) +``` +The Array of Rules can then be used to in your Rule Set +Note: Currently this interface does not support MinMaxRule initialisation but will +be added in the next iteration + Common Real World Example ```scala case class GlobalRules(regionID: Int, bu: String, subOrg: String, rules: Array[Rule]*) diff --git a/build.sbt b/build.sbt index cac2193..1932acc 100644 --- a/build.sbt +++ b/build.sbt @@ -25,6 +25,15 @@ libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.1" % Provided libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1" % Provided libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.6" % Test + +val circeVersion = "0.14.1" + +libraryDependencies ++= Seq( + "io.circe" %% "circe-core", + "io.circe" %% "circe-generic", + "io.circe" %% "circe-parser" +).map(_ % circeVersion) + run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run)).evaluated runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run)).evaluated diff --git a/src/main/scala/com/databricks/labs/validation/RuleParser.scala b/src/main/scala/com/databricks/labs/validation/RuleParser.scala new file mode 100644 index 0000000..05cf6c1 --- /dev/null +++ b/src/main/scala/com/databricks/labs/validation/RuleParser.scala @@ -0,0 +1,197 @@ +package com.databricks.labs.validation + +import io.circe.{Decoder, Json, parser} +import com.databricks.labs.validation.utils.Structures.Bounds +import org.apache.spark.sql.functions.col + +import scala.io.Source + + + + +sealed trait RuleParser{ + /** + * Define trait to enable extension of generic Rule Parser to provide a body support multiple input types + * */ + + val parserType:String + /** Identifier to be implemented by child class specifying format it supports*/ + def readRules(filePath:String):String + /** @param filePath The specific path where the json file containing the rules reside */ + def parseRules(rules:String):Array[Rule] + /** @param rules input object of genric Type T containing the parsed rules + * returns Array of Individual Rules specified in JSON object*/ +} + + + + +class JsonRuleParser extends RuleParser { + /** + * Implementation of RuleParser to support external rules in JSON Format + * + * + * */ + override final val parserType ="jsonParser" + def parseRules(rules:String):Array[Rule] = { + if (rules == null) { + val jsonRules = parser.decode[Array[Rule]](rules).right.get + jsonRules + } + else{ + val jsonRules = parser.decode[Array[Rule]](readRules()).right.get + jsonRules + } + } + + def readRules(filePath:String="rules.json"):String = { + val jsonRuleString: String = Source.fromResource(filePath).mkString + jsonRuleString + } + +/** + * Implicit decoder types needed by CIRCE lib to parse individual json items to be parsed to the supported Rule objects + * */ + + private val _boundsDecoder:Decoder[Bounds]={ + boundCursor => + for { + lower <- boundCursor.get[Double]("lower") + upper <- boundCursor.get[Double]("upper") + lowerInclusive <- boundCursor.getOrElse[Boolean]("lowerInclusive")(false) + upperInclusive <- boundCursor.getOrElse[Boolean]("upperInclusive")(false) + } yield Bounds(lower , upper , lowerInclusive , upperInclusive ) + } + + private val _ruleDecoder1:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + bounds <- ruleCursor.get[Bounds]("Bounds") + } yield Rule(ruleName, column ,bounds) + + } + + private val _ruleDecoder2:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + } yield Rule(ruleName, column) + } + + private val _ruleDecoder3:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validExpr <- ruleCursor.get[String]("validExpr").map(x => col(x)) + } yield Rule(ruleName , column ,validExpr) + } + + private val _ruleDecoder4:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validNumerics <- ruleCursor.get[Array[Double]]("validNumerics") + invertMatch <- ruleCursor.get[Boolean]("invertMatch") + } yield Rule(ruleName , column ,validNumerics,invertMatch) + } + + private val _ruleDecoder5a:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validNumerics <- ruleCursor.get[Array[Double]]("validNumerics") + + } yield Rule(ruleName , column ,validNumerics) + + } + + private val _ruleDecoder5b:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validNumerics <- ruleCursor.get[Array[Long]]("validNumerics") + + } yield Rule(ruleName , column ,validNumerics) + + } + + private val _ruleDecoder5c:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validNumerics <- ruleCursor.get[Array[Int]]("validNumerics") + + } yield Rule(ruleName , column ,validNumerics) + + } + + private val _ruleDecoder5d1:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validNumerics <- ruleCursor.get[Array[Double]]("validNumerics") + invertMatch <- ruleCursor.getOrElse[Boolean]("invertMatch")(false) + + } yield Rule(ruleName , column ,validNumerics, invertMatch) + + } + + private val _ruleDecoder5d2:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validNumerics <- ruleCursor.get[Array[Int]]("validNumerics") + invertMatch <- ruleCursor.getOrElse[Boolean]("invertMatch")(false) + + } yield Rule(ruleName , column ,validNumerics, invertMatch) + + } + + private val _ruleDecoder6:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validString <- ruleCursor.get[Array[String]]("validString") + ignoreCase <- ruleCursor.getOrElse[Boolean]("ignoreCase")(false) + invertMatch <- ruleCursor.getOrElse[Boolean]("invertMatch")(false) + } yield Rule(ruleName , column ,validString,ignoreCase,invertMatch) + } + + private val _ruleDecoder7:Decoder[Rule]={ + ruleCursor => + for { + ruleName <- ruleCursor.get[String]("ruleName") + column <- ruleCursor.get[String]("column").map(x => col(x)) + validString <- ruleCursor.get[Array[String]]("validString") + } yield Rule(ruleName , column ,validString) + } + + implicit val boundsDecoder: Decoder[Bounds] = _boundsDecoder + implicit val ruleDecoder: Decoder[Rule] = { + + _ruleDecoder3 or + _ruleDecoder4 or + _ruleDecoder5a or + _ruleDecoder5b or + _ruleDecoder5c or + _ruleDecoder5d1 or + _ruleDecoder5d2 or + _ruleDecoder6 or + _ruleDecoder7 or + _ruleDecoder1 or + _ruleDecoder2 + } + + +} diff --git a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala index 113c55d..743ab06 100644 --- a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala +++ b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala @@ -1,6 +1,7 @@ package com.databricks.labs.validation.utils import com.databricks.labs.validation.Rule +import com.sun.javafx.binding.SelectBinding.AsInteger import org.apache.spark.sql.{Column, DataFrame} /** @@ -22,21 +23,25 @@ object Lookups { object Structures { case class Bounds( - lower: Double = Double.NegativeInfinity, - upper: Double = Double.PositiveInfinity, + val lower: Double = Double.NegativeInfinity, + val upper: Double = Double.PositiveInfinity, lowerInclusive: Boolean = false, upperInclusive: Boolean = false) { def validationLogic(c: Column): Column = { val lowerLogic = if (lowerInclusive) c >= lower else c > lower val upperLogic = if (upperInclusive) c <= upper else c < upper lowerLogic && upperLogic - } + } } + + + case class MinMaxRuleDef(ruleName: String, column: Column, bounds: Bounds, by: Column*) case class ValidationResults(completeReport: DataFrame, summaryReport: DataFrame) + private[validation] class ValidationException(s: String) extends Exception(s) {} private[validation] class InvalidRuleException(r: Rule, s: String) extends Exception(s) { @@ -44,4 +49,6 @@ object Structures { throw new ValidationException(msg) } + + } diff --git a/src/test/resources/rules.json b/src/test/resources/rules.json new file mode 100644 index 0000000..809add7 --- /dev/null +++ b/src/test/resources/rules.json @@ -0,0 +1,53 @@ +[ + { + "ruleName":"ImplicitCoolingExpr", + "column":"booleanisithot" + }, + { + "ruleName":"HeatingRateIntRulewith2Bounds", + "column":"heatingrate-coolingrate", + "Bounds":{ + "lower":"0.01", + "upper":"1000.0", + "lowerInclusive" : true, + "upperInclusive" : true + } + }, + { + "ruleName":"HeatingRateIntRulewith1bounds", + "column":"heatingrate-coolingrate", + "Bounds":{ + "lower":"0.01", + "upper":"1000.0", + "lowerInclusive" : true + } + }, + { + "ruleName":"HeatingRateIntRulewithCategoryLookup", + "column":"heatingrate-coolingrate", + "validString":["Good,Bad"] + }, + { + "ruleName":"HeatingRateIntRulewithCategoryLookupIgnorecase", + "column":"heatingrate-coolingrate", + "validString":["Good,Bad"], + "ignoreCase":true + }, + { + "ruleName":"HeatingRateIntRulewithCategoryLookupInvertmatch", + "column":"heatingrate-coolingrate", + "validString":["Good,Bad"], + "invertMatch":true + }, + { + "ruleName":"HeatingRateIntRulewithCategoryLookupInt", + "column":"heatingrate-coolingrate", + "validNumerics":[1,10,100,1000] + }, + { + "ruleName":"HeatingRateIntRulewithCategoryLookupIntInvertTrue", + "column":"heatingrate-coolingrate", + "validNumerics":[1,10,100,1000], + "invertMatch":true + } +] \ No newline at end of file diff --git a/src/test/scala/com/databricks/labs/validation/RuleParserTestSuite.scala b/src/test/scala/com/databricks/labs/validation/RuleParserTestSuite.scala new file mode 100644 index 0000000..ed7eb06 --- /dev/null +++ b/src/test/scala/com/databricks/labs/validation/RuleParserTestSuite.scala @@ -0,0 +1,102 @@ +package com.databricks.labs.validation + +import com.databricks.labs.validation.utils.Structures.Bounds +import com.databricks.labs.validation.{JsonRuleParser, Rule} +import org.apache.spark.sql.functions.{col, expr} +import org.scalatest.funsuite.AnyFunSuite + +import scala.io.Source + +class RuleParserTestSuite extends AnyFunSuite with SparkSessionFixture{ + + test("Input json is Read Correctly with expected columns"){ + + val jsonParserInstance = new JsonRuleParser() + val parsedString=jsonParserInstance.readRules() + println(parsedString) + assert (parsedString.isInstanceOf[String]) + } + + + + +test("Json is parsed to Array of Rules"){ + + val jsonParserInstance = new JsonRuleParser() + val parsedString=jsonParserInstance.readRules() + val rulesArray = jsonParserInstance.parseRules(parsedString) + + assert (rulesArray.isInstanceOf[Array[Rule]]) + assert(rulesArray.length===8) +} + + test("Json of Rules is initialised to expected rules"){ + val jsonParserInstance = new JsonRuleParser() + val parsedString=jsonParserInstance.readRules() + val rulesArray = jsonParserInstance.parseRules(parsedString) + + //Rule 1 - Implicit Boolean Rule + + val testRule1 = Rule("ImplicitCoolingExpr",col("booleanisithot")) + print(rulesArray(0)) + print(testRule1) + assert(rulesArray(0).ruleName===testRule1.ruleName) + assert(rulesArray(0).inputColumnName===testRule1.inputColumnName) + + //Rule 2a - Rule with Bounds with lower and upper inclusive set + + val testRule2 = Rule("HeatingRateIntRulewith2Bounds",col("heatingrate-coolingrate"),Bounds(0.01, 1000.0,lowerInclusive=true,upperInclusive=true) ) + print(rulesArray(1)) + print(testRule2) + assert(rulesArray(1).ruleName===testRule2.ruleName) + assert(rulesArray(1).inputColumnName===testRule2.inputColumnName) + assert( + (rulesArray(1).boundaries.lower==testRule2.boundaries.lower) && + (rulesArray(1).boundaries.upper==testRule2.boundaries.upper) && + (rulesArray(1).boundaries.lowerInclusive==testRule2.boundaries.lowerInclusive) && + (rulesArray(1).boundaries.upperInclusive==testRule2.boundaries.upperInclusive),"Boundary is not set up correctly" + ) + + //Rule 2b - Rule with Bounds with lower inclusive set + + val testRule3 = Rule("HeatingRateIntRulewith1bounds",col("heatingrate-coolingrate"),Bounds(0.01, 1000.0,lowerInclusive=true) ) + print(rulesArray(2)) + print(testRule3) + assert(rulesArray(2).ruleName===testRule3.ruleName,"Rule Name not set correctly") + assert(rulesArray(2).inputColumnName===testRule3.inputColumnName) + assert( + (rulesArray(2).boundaries.lower==testRule3.boundaries.lower) && + (rulesArray(2).boundaries.upper==testRule3.boundaries.upper) && + (rulesArray(2).boundaries.lowerInclusive==testRule3.boundaries.lowerInclusive) && + (rulesArray(2).boundaries.upperInclusive==testRule3.boundaries.upperInclusive),"Boundary is not set up correctly" + ) + + + val testRule4 = Rule("HeatingRateIntRulewithCategoryLookup",col("heatingrate-coolingrate"),Array("Good","Bad") ) + print(rulesArray(3)) + print(testRule4) + assert(rulesArray(3).ruleName===testRule4.ruleName) + assert(rulesArray(3).inputColumnName===testRule4.inputColumnName) + assert(rulesArray(3).ruleType == RuleType.ValidateStrings,"Rule Type is not set up correctly") + + val testRule5 = Rule("HeatingRateIntRulewithCategoryLookupIgnorecase",col("heatingrate-coolingrate"),Array("Good","Bad"),ignoreCase = true ) + print(rulesArray(4)) + print(testRule5) + assert(rulesArray(4).ruleName===testRule5.ruleName) + assert(rulesArray(4).inputColumnName===testRule5.inputColumnName) + assert(rulesArray(4).ruleType == RuleType.ValidateStrings,"Rule Type is not set up correctly") + + val testRule6 = Rule("HeatingRateIntRulewithCategoryLookupInt",col("heatingrate-coolingrate"),Array(1,10,100,1000)) + print(rulesArray(6)) + print(testRule6) + assert(rulesArray(6).ruleName===testRule6.ruleName) + assert(rulesArray(6).inputColumnName===testRule6.inputColumnName) + assert(rulesArray(6).ruleType == RuleType.ValidateNumerics,"Rule Type is not set up correctly") + + + } + + + + +} diff --git a/src/test/scala/com/databricks/labs/validation/RuleTestSuite.scala b/src/test/scala/com/databricks/labs/validation/RuleTestSuite.scala index 16cdd6c..1c9eb69 100644 --- a/src/test/scala/com/databricks/labs/validation/RuleTestSuite.scala +++ b/src/test/scala/com/databricks/labs/validation/RuleTestSuite.scala @@ -108,4 +108,4 @@ class RuleTestSuite extends AnyFunSuite with SparkSessionFixture { } -} +} \ No newline at end of file From 4d9b21aa2cad9d72eb9ad4aec1522448d0876c0e Mon Sep 17 00:00:00 2001 From: Sidharth Bolar Date: Sun, 21 Aug 2022 20:28:46 +1000 Subject: [PATCH 2/2] revert changes to Structures --- .../databricks/labs/validation/utils/Structures.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala index 743ab06..578bfde 100644 --- a/src/main/scala/com/databricks/labs/validation/utils/Structures.scala +++ b/src/main/scala/com/databricks/labs/validation/utils/Structures.scala @@ -1,7 +1,6 @@ package com.databricks.labs.validation.utils import com.databricks.labs.validation.Rule -import com.sun.javafx.binding.SelectBinding.AsInteger import org.apache.spark.sql.{Column, DataFrame} /** @@ -23,8 +22,8 @@ object Lookups { object Structures { case class Bounds( - val lower: Double = Double.NegativeInfinity, - val upper: Double = Double.PositiveInfinity, + lower: Double = Double.NegativeInfinity, + upper: Double = Double.PositiveInfinity, lowerInclusive: Boolean = false, upperInclusive: Boolean = false) { def validationLogic(c: Column): Column = { @@ -34,14 +33,10 @@ object Structures { } } - - - case class MinMaxRuleDef(ruleName: String, column: Column, bounds: Bounds, by: Column*) case class ValidationResults(completeReport: DataFrame, summaryReport: DataFrame) - private[validation] class ValidationException(s: String) extends Exception(s) {} private[validation] class InvalidRuleException(r: Rule, s: String) extends Exception(s) { @@ -49,6 +44,4 @@ object Structures { throw new ValidationException(msg) } - - }