From fbad63d7724bcf066888a4e5e574d865cb4f4744 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Wed, 19 Feb 2020 12:18:21 -0800 Subject: [PATCH] Spark 2.4 support (#402) * Revert "Revert back to Spark 2.3 (#399)" This reverts commit 95a77b17269a71bf0d53c54df7d76f0bfe862275. * Update to Spark 2.4.3 and XGBoost 0.90 * special double serializer fix * fix serialization * fix serialization * docs * fixed missng value for test * meta fix * Updated DecisionTreeNumericMapBucketizer test to deal with the change made to decision tree pruning in Spark 2.4. If nodes are split, but both child nodes lead to the same prediction then the split is pruned away. This updates the test so this doesn't happen for feature 'b' * fix params meta test * FIxed failing xgboost test * ident * cleanup * added dataframe reader and writer extensions * added const * cherrypick fixes * added xgboost params + update models to use public predict method * blarg * double ser test * update mleap and spark testing base * Update README.md * type fix * bump minor version * Update Spark version in the README * bump version * Update build.gradle * Update pom.xml * set correct json4s version * upgrade helloworld deps * upgrade notebook deps on TMog and Spark * bump to version 0.7.0 for Spark update * align helloworld dependencies * align helloworld dependencies * get -> getOrElse with exception * fix helloworld compilation * Spark 2.4.5 * Spark 2.4.5 * Spark 2.4.5 * Update OpTitanicSimple.ipynb * Update OpIris.ipynb * Revert "Spark 2.4.5" This reverts commit b3c0a7452d6478dbd05e52358ffbd05a0081cd8a. * Revert "Spark 2.4.5" This reverts commit f4ab3fda85ab8d62d2c9f182a53d30a67baecaac. * Revert "Spark 2.4.5" This reverts commit 50d9dfb98398039a6c6e46f9c71f0f8a92baabab. * Revert "Update OpTitanicSimple.ipynb" This reverts commit 341797238ecc039d5d5810c4244ccecf14080c3d. * Revert "Update OpIris.ipynb" This reverts commit df38bccff49d91698688bc5e7fa55d733aae0450. Co-authored-by: Christopher Suchanek Co-authored-by: Kevin Moore Co-authored-by: Nico de Vos --- README.md | 2 +- build.gradle | 22 +++---- cli/build.gradle | 6 +- .../com/salesforce/op/cli/SchemaSource.scala | 2 +- .../salesforce/op/cli/gen/AvroFieldTest.scala | 4 +- .../impl/classification/OpLinearSVC.scala | 4 +- .../classification/OpXGBoostClassifier.scala | 26 +++++++- .../regression/OpDecisionTreeRegressor.scala | 6 +- .../impl/regression/OpGBTRegressor.scala | 6 +- .../impl/regression/OpLinearRegression.scala | 6 +- .../regression/OpRandomForestRegressor.scala | 7 +- .../impl/regression/OpXGBoostRegressor.scala | 26 ++++++-- .../stages/impl/selector/ModelSelector.scala | 1 - .../specific/OpPredictionModel.scala | 9 ++- .../xgboost4j/scala/spark/XGBoostParams.scala | 6 +- .../com/salesforce/op/ModelInsightsTest.scala | 10 +-- .../OpPipelineStageReaderWriterTest.scala | 17 +++-- .../OpClassifierModelTest.scala | 15 ++--- ...DecisionTreeNumericMapBucketizerTest.scala | 4 +- .../op/stages/OpPipelineStageReader.scala | 7 +- .../stages/OpPipelineStageReaderWriter.scala | 3 + .../op/utils/spark/RichDataset.scala | 33 +++++++++- .../ml/SparkDefaultParamsReadWrite.scala | 65 ++++++++++++++----- gradle.properties | 2 +- helloworld/build.gradle | 6 +- helloworld/notebooks/OpHousingPrices.ipynb | 2 +- helloworld/notebooks/OpIris.ipynb | 4 +- helloworld/notebooks/OpTitanicSimple.ipynb | 4 +- pom.xml | 30 +++------ .../op/readers/CSVAutoReaders.scala | 12 ++-- .../datasources/csv/CSVSchemaUtils.scala | 12 ++-- templates/simple/build.gradle.template | 10 +-- utils/build.gradle | 2 +- .../op/utils/io/csv/CSVToAvro.scala | 2 +- .../utils/json/SpecialDoubleSerializer.scala | 7 +- .../json/SpecialDoubleSerializerTest.scala | 34 ++++++---- 36 files changed, 253 insertions(+), 161 deletions(-) diff --git a/README.md b/README.md index 493acbf86d..3d69bdec03 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ Start by picking TransmogrifAI version to match your project dependencies from t | TransmogrifAI Version | Spark Version | Scala Version | Java Version | |-------------------------------------------------------|:-------------:|:-------------:|:------------:| -| 0.6.2 (unreleased, master) | 2.3 | 2.11 | 1.8 | +| 0.7.0 (unreleased, master) | 2.4 | 2.11 | 1.8 | | **0.6.1 (stable)**, 0.6.0, 0.5.3, 0.5.2, 0.5.1, 0.5.0 | **2.3** | **2.11** | **1.8** | | 0.4.0, 0.3.4 | 2.2 | 2.11 | 1.8 | diff --git a/build.gradle b/build.gradle index f1ec9672ae..990d6e0f0c 100644 --- a/build.gradle +++ b/build.gradle @@ -1,15 +1,16 @@ buildscript { repositories { - maven { url "https://plugins.gradle.org/m2/" } mavenCentral() + jcenter() + maven { url "https://plugins.gradle.org/m2/" } } dependencies { classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' } } plugins { - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'org.scoverage' version '2.5.0' id 'net.minecrell.licenser' version '0.4.1' id 'com.github.jk1.dependency-license-report' version '0.5.0' @@ -57,14 +58,13 @@ configure(allProjs) { scalaVersionRevision = '12' scalaTestVersion = '3.0.5' scalaCheckVersion = '1.14.0' - junitVersion = '4.11' - avroVersion = '1.7.7' - sparkVersion = '2.3.2' - sparkAvroVersion = '4.0.0' + junitVersion = '4.12' + avroVersion = '1.8.2' + sparkVersion = '2.4.4' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' hadoopVersion = 'hadoop2' - json4sVersion = '3.2.11' // matches Spark dependency version + json4sVersion = '3.5.3' // matches Spark dependency version jodaTimeVersion = '2.9.4' jodaConvertVersion = '1.8.1' algebirdVersion = '0.13.4' @@ -75,20 +75,20 @@ configure(allProjs) { googleLibPhoneNumberVersion = '8.8.5' googleGeoCoderVersion = '2.82' googleCarrierVersion = '1.72' - chillVersion = '0.8.4' + chillVersion = '0.9.3' reflectionsVersion = '0.9.11' collectionsVersion = '3.2.2' optimaizeLangDetectorVersion = '0.0.1' tikaVersion = '1.22' - sparkTestingBaseVersion = '2.3.1_0.10.0' + sparkTestingBaseVersion = '2.4.3_0.12.0' sourceCodeVersion = '0.1.3' pegdownVersion = '1.4.2' commonsValidatorVersion = '1.6' commonsIOVersion = '2.6' scoveragePluginVersion = '1.3.1' - xgboostVersion = '0.81' + xgboostVersion = '0.90' akkaSlf4jVersion = '2.3.11' - mleapVersion = '0.13.0' + mleapVersion = '0.14.0' memoryFilesystemVersion = '2.1.0' } diff --git a/cli/build.gradle b/cli/build.gradle index 3062917744..3d6e9ffd9c 100644 --- a/cli/build.gradle +++ b/cli/build.gradle @@ -69,7 +69,6 @@ task copyTemplates(type: Copy) { fileName.replace(".gradle.template", ".gradle") } expand([ - databaseHostname: 'db.company.com', version: scalaVersion, scalaVersion: scalaVersion, scalaVersionRevision: scalaVersionRevision, @@ -77,12 +76,9 @@ task copyTemplates(type: Copy) { junitVersion: junitVersion, sparkVersion: sparkVersion, avroVersion: avroVersion, - sparkAvroVersion: sparkAvroVersion, hadoopVersion: hadoopVersion, collectionsVersion: collectionsVersion, - transmogrifaiVersion: version, - buildNumber: (int)(Math.random() * 1000), - date: new Date() + transmogrifaiVersion: version ]) } diff --git a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala index ee863c8a81..5ad27f866b 100644 --- a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala +++ b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala @@ -138,7 +138,7 @@ case class AutomaticSchema(recordClassName: String)(dataFile: File) extends Sche case Some(actualType) => val newSchema = Schema.create(actualType) val schemaField = - new Schema.Field(field.name, newSchema, "auto-generated", orgSchemaField.defaultValue) + new Schema.Field(field.name, newSchema, "auto-generated", orgSchemaField.defaultVal()) AvroField.from(schemaField) } } else field diff --git a/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala b/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala index 80bce98cab..38a686a711 100644 --- a/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala +++ b/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala @@ -69,7 +69,7 @@ class AvroFieldTest extends FlatSpec with TestCommon with Assertions { val allSchemas = (enum::unions)++simpleSchemas // NULL does not work val fields = allSchemas.zipWithIndex map { - case (s, i) => new Schema.Field("x" + i, s, "Who", null) + case (s, i) => new Schema.Field("x" + i, s, "Who", null: Object) } val expected = List( @@ -86,7 +86,7 @@ class AvroFieldTest extends FlatSpec with TestCommon with Assertions { an[IllegalArgumentException] should be thrownBy { val nullSchema = Schema.create(Schema.Type.NULL) - val nullField = new Schema.Field("xxx", null, "Nobody", null) + val nullField = new Schema.Field("xxx", null, "Nobody", null: Object) AvroField from nullField } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala index 425d43a866..1275e3d163 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala @@ -152,14 +152,14 @@ class OpLinearSVCModel ) extends OpPredictorWrapperModel[LinearSVCModel](uid = uid, operationName = operationName, sparkModel = sparkModel) { @transient lazy private val predictRaw = reflectMethod(getSparkMlStage().get, "predictRaw") - @transient lazy private val predict = reflectMethod(getSparkMlStage().get, "predict") + @transient lazy private val predict: Vector => Double = getSparkMlStage().get.predict(_) /** * Function used to convert input to output */ override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { val raw = predictRaw(features.value).asInstanceOf[Vector] - val pred = predict(features.value).asInstanceOf[Double] + val pred = predict(features.value) Prediction(rawPrediction = raw, prediction = pred) } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala index 30f9801fb4..3c07a58787 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala @@ -235,6 +235,11 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) */ def setMaxBins(value: Int): this.type = set(maxBins, value) + /** + * Maximum number of nodes to be added. Only relevant when grow_policy=lossguide is set. + */ + def setMaxLeaves(value: Int): this.type = set(maxLeaves, value) + /** * This is only used for approximate greedy algorithm. * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select @@ -282,8 +287,19 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) def setLambdaBias(value: Double): this.type = set(lambdaBias, value) // setters for learning params + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:squarederror, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:squarederror + */ def setObjective(value: String): this.type = set(objective, value) + /** + * Objective type used for training. For options see [[ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams]] + */ + def setObjectiveType(value: String): this.type = set(objectiveType, value) + /** * Specify the learning task and the corresponding learning objective. * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, @@ -310,6 +326,11 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) */ def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + /** + * Define the expected optimization to the evaluation metrics, true to maximize otherwise minimize it + */ + def setMaximizeEvaluationMetrics(value: Boolean): this.type = set(maximizeEvaluationMetrics, value) + /** * Customized objective function provided by user. default: null */ @@ -359,17 +380,18 @@ class OpXGBoostClassificationModel private lazy val model = getSparkMlStage().get private lazy val booster = model.nativeBooster - private lazy val treeLimit = model.getTreeLimit.toInt + private lazy val treeLimit = model.getTreeLimit private lazy val missing = model.getMissing override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { - val data = removeMissingValues(Iterator(features.value.asXGB), missing) + val data = processMissingValues(Iterator(features.value.asXGB), missing) val dm = new DMatrix(dataIter = data) val rawPred = booster.predict(dm, outPutMargin = true, treeLimit = treeLimit)(0).map(_.toDouble) val rawPrediction = if (model.numClasses == 2) Array(-rawPred(0), rawPred(0)) else rawPred val prob = booster.predict(dm, outPutMargin = false, treeLimit = treeLimit)(0).map(_.toDouble) val probability = if (model.numClasses == 2) Array(1.0 - prob(0), prob(0)) else prob val prediction = probability2predictionMirror(Vectors.dense(probability)).asInstanceOf[Double] + Prediction(prediction = prediction, rawPrediction = rawPrediction, probability = probability) } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala index f42e1e50ed..39a5735949 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor, OpDecisionTreeRegressorParams} import scala.reflect.runtime.universe.TypeTag @@ -113,7 +112,4 @@ class OpDecisionTreeRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[DecisionTreeRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala index a8d69c9f14..b5717b49a4 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor, OpGBTRegressorParams} import scala.reflect.runtime.universe.TypeTag @@ -139,7 +138,4 @@ class OpGBTRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[GBTRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala index 569e81953a..863be31c60 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala @@ -34,7 +34,6 @@ import com.salesforce.op._ import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel, OpLinearRegressionParams} import scala.reflect.runtime.universe.TypeTag @@ -205,7 +204,4 @@ class OpLinearRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[LinearRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala index 4b1aca8265..4b0fdbd1d5 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{OpRandomForestRegressorParams, RandomForestRegressionModel, RandomForestRegressor} import scala.reflect.runtime.universe.TypeTag @@ -126,8 +125,4 @@ class OpRandomForestRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[RandomForestRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala index 688f34f812..8e2eaaf49d 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait} import ml.dmlc.xgboost4j.scala.spark.{OpXGBoostRegressorParams, TrackerConf, XGBoostRegressionModel, XGBoostRegressor} @@ -234,6 +233,11 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) */ def setMaxBins(value: Int): this.type = set(maxBins, value) + /** + * Maximum number of nodes to be added. Only relevant when grow_policy=lossguide is set. + */ + def setMaxLeaves(value: Int): this.type = set(maxLeaves, value) + /** * This is only used for approximate greedy algorithm. * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select @@ -281,8 +285,19 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) def setLambdaBias(value: Double): this.type = set(lambdaBias, value) // setters for learning params + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:squarederror, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:squarederror + */ def setObjective(value: String): this.type = set(objective, value) + /** + * Objective type used for training. For options see [[ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams]] + */ + def setObjectiveType(value: String): this.type = set(objectiveType, value) + /** * Specify the learning task and the corresponding learning objective. * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, @@ -309,6 +324,11 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) */ def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + /** + * Define the expected optimization to the evaluation metrics, true to maximize otherwise minimize it + */ + def setMaximizeEvaluationMetrics(value: Boolean): this.type = set(maximizeEvaluationMetrics, value) + /** * Customized objective function provided by user. default: null */ @@ -341,6 +361,4 @@ class OpXGBoostRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[XGBoostRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala index a7e5dbac68..f32aae608e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala @@ -41,7 +41,6 @@ import com.salesforce.op.stages.impl.tuning.{BestEstimator, _} import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapperModel, SparkModelConverter} import com.salesforce.op.utils.spark.RichMetadata._ -import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichParamMap._ import com.salesforce.op.utils.stages.FitStagesUtil._ import org.apache.spark.ml.param._ diff --git a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala index cfcaae7278..54b7777926 100644 --- a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala +++ b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala @@ -52,9 +52,12 @@ abstract class OpPredictionModel[T <: PredictionModel[Vector, T]] operationName: String ) extends OpPredictorWrapperModel[T](uid = uid, operationName = operationName, sparkModel = sparkModel) { - protected def predictMirror: MethodMirror - - protected def predict(features: Vector): Double = predictMirror.apply(features).asInstanceOf[Double] + /** + * Predict label for the given features + */ + @transient protected lazy val predict: Vector => Double = getSparkMlStage().getOrElse( + throw new RuntimeException(s"Could not find the wrapped Spark stage.") + ).predict(_) /** * Function used to convert input to output diff --git a/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala index 88077fa63f..3c8959997d 100644 --- a/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala +++ b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala @@ -104,8 +104,8 @@ case object OpXGBoost { } /** - * Hack to access [[ml.dmlc.xgboost4j.scala.spark.XGBoost.removeMissingValues]] private method + * Hack to access [[ml.dmlc.xgboost4j.scala.spark.XGBoost.processMissingValues]] private method */ - def removeMissingValues(xgbLabelPoints: Iterator[LabeledPoint], missing: Float): Iterator[LabeledPoint] = - XGBoost.removeMissingValues(xgbLabelPoints, missing) + def processMissingValues(xgbLabelPoints: Iterator[LabeledPoint], missing: Float): Iterator[LabeledPoint] = + XGBoost.processMissingValues(xgbLabelPoints, missing) } diff --git a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala index 50f5f3b392..7412ba5c7d 100644 --- a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala +++ b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala @@ -45,19 +45,20 @@ import com.salesforce.op.stages.impl.tuning.{DataCutter, DataSplitter} import com.salesforce.op.test.{PassengerSparkFixtureTest, TestFeatureBuilder} import com.salesforce.op.testkit.RandomReal import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} -import com.twitter.algebird.Moments +import ml.dmlc.xgboost4j.scala.spark.OpXGBoostQuietLogging import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.junit.runner.RunWith +import com.twitter.algebird.Moments import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import scala.util.{Failure, Success} @RunWith(classOf[JUnitRunner]) -class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with DoubleEquality { +class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with DoubleEquality with OpXGBoostQuietLogging { private val density = weight / height private val generVec = genderPL.vectorize(topK = 10, minSupport = 1, cleanText = true) @@ -74,8 +75,8 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou val lrParams = new ParamGridBuilder().addGrid(lr.regParam, Array(0.01, 0.1)).build() val models = Seq(lr -> lrParams).asInstanceOf[Seq[(EstimatorType, Array[ParamMap])]] - val xgbClassifier = new OpXGBoostClassifier().setSilent(1).setSeed(42L) - val xgbRegressor = new OpXGBoostRegressor().setSilent(1).setSeed(42L) + val xgbClassifier = new OpXGBoostClassifier().setMissing(0.0f).setSilent(1).setSeed(42L) + val xgbRegressor = new OpXGBoostRegressor().setMissing(0.0f).setSilent(1).setSeed(42L) val xgbClassifierPred = xgbClassifier.setInput(label, features).getOutput() val xgbRegressorPred = xgbRegressor.setInput(label, features).getOutput() lazy val xgbWorkflow = @@ -445,7 +446,6 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou } it should "correctly serialize and deserialize from json when raw feature filter is used" in { - val insights = modelWithRFF.modelInsights(predWithMaps) ModelInsights.fromJson(insights.toJson()) match { case Failure(e) => fail(e) diff --git a/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala b/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala index 8791277d9e..50019561a2 100644 --- a/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala @@ -70,8 +70,8 @@ private[stages] abstract class OpPipelineStageReaderWriterTest it should "write class name" in { (stageJson \ FN.Class.entryName).extract[String] shouldBe stage.getClass.getName } - it should "write paramMap" in { - val params = (stageJson \ FN.ParamMap.entryName).extract[Map[String, Any]] + it should "write params map" in { + val params = extractParams(stageJson).extract[Map[String, Any]] if (hasOutputName) { params should have size 4 params.keys shouldBe Set("inputFeatures", "outputMetadata", "inputSchema", "outputFeatureName") @@ -81,17 +81,18 @@ private[stages] abstract class OpPipelineStageReaderWriterTest } } it should "write outputMetadata" in { - val metadataStr = compact(render((stageJson \ FN.ParamMap.entryName) \ "outputMetadata")) + val params = extractParams(stageJson) + val metadataStr = compact(render(extractParams(stageJson) \ "outputMetadata")) val metadata = Metadata.fromJson(metadataStr) metadata shouldBe stage.getMetadata() } it should "write inputSchema" in { - val schemaStr = compact(render((stageJson \ FN.ParamMap.entryName) \ "inputSchema")) + val schemaStr = compact(render(extractParams(stageJson) \ "inputSchema")) val schema = DataType.fromJson(schemaStr) schema shouldBe stage.getInputSchema() } it should "write input features" in { - val jArray = ((stageJson \ FN.ParamMap.entryName) \ "inputFeatures").extract[JArray] + val jArray = (extractParams(stageJson) \ "inputFeatures").extract[JArray] jArray.values should have length expectedFeaturesLength val obj = jArray(0).extract[JObject] obj.values.keys shouldBe Set("name", "isResponse", "isRaw", "uid", "typeName", "stages", "originFeatures") @@ -118,4 +119,10 @@ private[stages] abstract class OpPipelineStageReaderWriterTest stageLoaded.getInputSchema() shouldBe stage.getInputSchema() } + private def extractParams(stageJson: JValue): JValue = { + val defaultParamsMap = stageJson \ FN.DefaultParamMap.entryName + val paramsMap = stageJson \ FN.ParamMap.entryName + defaultParamsMap.merge(paramsMap) + } + } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala index ab8168cd6c..759d542b18 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala @@ -134,27 +134,22 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext with OpXGBoos .setLabelCol(labelF.name) val spk = cl.fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF), false) + compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } - def compareOutputs - ( - df1: DataFrame, - df2: DataFrame, - fullRawPred: Boolean = true - )(implicit arrayEquality: Equality[Array[Double]]): Unit = { + def compareOutputs(df1: DataFrame, df2: DataFrame)(implicit arrayEquality: Equality[Array[Double]]): Unit = { def keysStartsWith(name: String, value: Map[String, Double]): Array[Double] = { val names = value.keys.filter(_.startsWith(name)).toArray.sorted names.map(value) } + val sorted1 = df1.collect().sortBy(_.getAs[Double](4)) val sorted2 = df2.collect().sortBy(_.getAs[Map[String, Double]](2)(Prediction.Keys.PredictionName)) - sorted1.zip(sorted2).foreach{ case (r1, r2) => + sorted1.zip(sorted2).foreach { case (r1, r2) => val map = r2.getAs[Map[String, Double]](2) r1.getAs[Double](4) shouldEqual map(Prediction.Keys.PredictionName) r1.getAs[Vector](3).toArray shouldEqual keysStartsWith(Prediction.Keys.ProbabilityName, map) - if (fullRawPred) r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map) - else r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map).tail + r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map) } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala index 6b4fc3105b..10e368c7c9 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala @@ -53,7 +53,7 @@ class DecisionTreeNumericMapBucketizerTest extends OpEstimatorSpec[OPVector, val (inputData, estimator) = { val numericData = Seq( - Map("a" -> 1.0), + Map("a" -> 1.0, "b" -> 1.0), Map("a" -> 18.0), Map("b" -> 0.0), Map("a" -> -1.23, "b" -> 1.0), @@ -66,7 +66,7 @@ class DecisionTreeNumericMapBucketizerTest extends OpEstimatorSpec[OPVector, } val expectedResult = Seq( - Vectors.sparse(7, Array(1, 5, 6), Array(1.0, 1.0, 1.0)), + Vectors.sparse(7, Array(1, 4, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(1, 5, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(2, 3, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(0, 4, 6), Array(1.0, 1.0, 1.0)), diff --git a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala index 8e0eea4087..4b1a7d1e87 100644 --- a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala +++ b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala @@ -122,9 +122,10 @@ final class OpPipelineStageReader private // Update [[SparkWrapperParams]] with path so we can load the [[SparkStageParam]] instance val updatedMetadata = stage match { - case _: SparkWrapperParams[_] => - val updatedParams = SparkStageParam.updateParamsMetadataWithPath(metadata.params, path) - metadata.copy(params = updatedParams) + case _: SparkWrapperParams[_] => metadata.copy( + params = SparkStageParam.updateParamsMetadataWithPath(metadata.params, path), + defaultParams = SparkStageParam.updateParamsMetadataWithPath(metadata.defaultParams, path) + ) case _ => metadata } diff --git a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala index 180113b7cf..6c96eb6582 100644 --- a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala +++ b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala @@ -171,6 +171,9 @@ object OpPipelineStageReaderWriter extends OpPipelineStageReadWriteFormats { case object Uid extends FieldNames("uid") case object Class extends FieldNames("class") case object ParamMap extends FieldNames("paramMap") + case object DefaultParamMap extends FieldNames("defaultParamMap") + case object Timestamp extends FieldNames("timestamp") + case object SparkVersion extends FieldNames("sparkVersion") } /** diff --git a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala index 27ea60e30a..703b61414f 100644 --- a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala +++ b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala @@ -33,13 +33,13 @@ package com.salesforce.op.utils.spark import com.salesforce.op.features.types._ import com.salesforce.op.features.{FeatureLike, FeatureSparkTypes, OPFeature} import com.salesforce.op.utils.text.TextUtils +import org.apache.avro.mapred.AvroInputFormat import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} -import com.databricks.spark.avro._ import org.apache.spark.ml.linalg.{Vector, Vectors} -import scala.collection.mutable.{WrappedArray => MWrappedArray} +import scala.collection.mutable.{WrappedArray => MWrappedArray} import scala.reflect.ClassTag @@ -60,6 +60,34 @@ object RichDataset { private[op] def schemaPath(path: String): String = s"${path.stripSuffix("/")}/schema" private[op] def dataPath(path: String): String = s"${path.stripSuffix("/")}/data" + private val AvroFormat = "avro" + + implicit class RichDataFrameWriter[T](w: DataFrameWriter[T]) { + + /** + * Saves the content of the `DataFrame` in Avro format at the specified path. + * This is equivalent to: + * {{{ + * format("avro").save(path) + * }}} + */ + def avro(path: String): Unit = w.format(AvroFormat).save(path) + + } + + implicit class RichDataFrameReader(r: DataFrameReader) { + + /** + * Loads Avro files and returns the result as a `DataFrame`. + * This is equivalent to: + * {{{ + * format("avro").load(path) + * }}} + */ + def avro(path: String): DataFrame = r.format(AvroFormat).load(path) + + } + /** * Loads a dataframe from a saved Avro file and dataframe schema file generated by RichDataFrame.saveAvro. * Relies on spark-avro package for Avro file generation, which seems to have a bug/feature that makes all fields @@ -84,7 +112,6 @@ object RichDataset { data.select(columns: _*) } - /** * A dataframe with three quantifiers: forall, exists, and forNone (see below) * the rest of extended functionality comes from RichDataset diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index 8179b2b403..3d697588ba 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -21,6 +21,7 @@ package org.apache.spark.ml import com.salesforce.op.stages.OpPipelineStageBase +import com.salesforce.op.stages.OpPipelineStageReaderWriter.FieldNames._ import org.apache.spark.ml.param.ParamPair import org.apache.spark.ml.util.{DefaultParamsReader, DefaultParamsWriter} import org.json4s.JsonDSL._ @@ -47,21 +48,26 @@ case object SparkDefaultParamsReadWrite { * @see [[OpPipelineStageWriter]] for details on what this includes. */ def getMetadataToSave( - stage: OpPipelineStageBase, + instance: OpPipelineStageBase, extraMetadata: Option[JObject] = None, paramMap: Option[JValue] = None ): JObject = { - val uid = stage.uid - val cls = stage.getClass.getName - val params = stage.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] + val uid = instance.uid + val cls = instance.getClass.getName + val params = instance.paramMap.toSeq + val defaultParams = instance.defaultParamMap.toSeq val jsonParams = paramMap.getOrElse(render(params.map { case ParamPair(p, v) => p.name -> parse(p.jsonEncode(v)) }.toList)) - val basicMetadata = ("class" -> cls) ~ - ("timestamp" -> System.currentTimeMillis()) ~ - ("sparkVersion" -> org.apache.spark.SPARK_VERSION) ~ - ("uid" -> uid) ~ - ("paramMap" -> jsonParams) + val jsonDefaultParams = render(defaultParams.map { case ParamPair(p, v) => + p.name -> parse(p.jsonEncode(v)) + }.toList) + val basicMetadata = (Class.entryName -> cls) ~ + (Timestamp.entryName -> System.currentTimeMillis()) ~ + (SparkVersion.entryName -> org.apache.spark.SPARK_VERSION) ~ + (Uid.entryName -> uid) ~ + (ParamMap.entryName -> jsonParams) ~ + (DefaultParamMap.entryName -> jsonDefaultParams) val metadata = extraMetadata match { case Some(jObject) => basicMetadata ~ jObject @@ -75,19 +81,48 @@ case object SparkDefaultParamsReadWrite { * Parse metadata JSON string produced by [[DefaultParamsWriter.getMetadataToSave()]]. * This is a helper function for [[loadMetadata()]]. * + * Note: this method was taken from [[DefaultParamsWriter.parseMetadata]], + * but modified to avoid failing on loading of Spark models prior to 2.4.x + * * @param metadataStr JSON string of metadata * @param expectedClassName If non empty, this is checked against the loaded metadata. * @throws IllegalArgumentException if expectedClassName is specified and does not match metadata */ - def parseMetadata(jsonStr: String): Metadata = - DefaultParamsReader.parseMetadata(jsonStr) + def parseMetadata(metadataStr: String, expectedClassName: String = ""): Metadata = { + val metadata = parse(metadataStr) + + val className = (metadata \ Class.entryName).extract[String] + val uid = (metadata \ Uid.entryName).extract[String] + val timestamp = (metadata \ Timestamp.entryName).extract[Long] + val sparkVersion = (metadata \ SparkVersion.entryName).extract[String] + val params = metadata \ ParamMap.entryName + val defaultParams = metadata \ DefaultParamMap.entryName + if (expectedClassName.nonEmpty) { + require(className == expectedClassName, s"Error loading metadata: Expected class name" + + s" $expectedClassName but found class name $className") + } + // ****************************************************************************************** + /* + * Backward compatible fix for models trained with older versions of Spark (prior to 2.4.x). + * The change introduced in https://github.com/apache/spark/pull/20633 added serialization of + * default params, older models won't have them and fail to load. + */ + val defaultParamsFix = if (defaultParams == JNothing) JObject() else defaultParams + // ****************************************************************************************** + + new Metadata(className, uid, timestamp, sparkVersion, params, defaultParamsFix, metadata, metadataStr) + } /** * Extract Params from metadata, and set them in the instance. - * This works if all Params implement [[org.apache.spark.ml.param.Param.jsonDecode()]]. - * TODO: Move to [[Metadata]] method + * This works if all Params (except params included by `skipParams` list) implement + * [[org.apache.spark.ml.param.Param.jsonDecode()]]. + * + * @param skipParams The params included in `skipParams` won't be set. This is useful if some + * params don't implement [[org.apache.spark.ml.param.Param.jsonDecode()]] + * and need special handling. */ - def getAndSetParams(stage: OpPipelineStageBase, metadata: Metadata): Unit = - DefaultParamsReader.getAndSetParams(stage, metadata) + def getAndSetParams(stage: OpPipelineStageBase, metadata: Metadata, skipParams: Option[List[String]] = None): Unit = + metadata.getAndSetParams(stage, skipParams) } diff --git a/gradle.properties b/gradle.properties index ab8c90bea4..f8fb7ad658 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.6.2-SNAPSHOT +version=0.7.0-SNAPSHOT group=com.salesforce.transmogrifai org.gradle.caching=true diff --git a/helloworld/build.gradle b/helloworld/build.gradle index 3420f5fc95..760e95a23a 100644 --- a/helloworld/build.gradle +++ b/helloworld/build.gradle @@ -5,11 +5,11 @@ buildscript { } dependencies { classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' } } plugins { id 'com.github.johnrengelman.shadow' version '5.0.0' - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'org.scoverage' version '2.5.0' } repositories { @@ -36,8 +36,8 @@ mainClassName = "please.set.main.class.in.build.gradle" ext { scalaVersion = '2.11' scalaVersionRevision = '12' - junitVersion = '4.11' - sparkVersion = '2.3.2' + junitVersion = '4.12' + sparkVersion = '2.4.4' scalatestVersion = '3.0.0' transmogrifaiVersion ='0.6.1' collectionsVersion = '3.2.2' diff --git a/helloworld/notebooks/OpHousingPrices.ipynb b/helloworld/notebooks/OpHousingPrices.ipynb index 1245753ff5..76c6c59e17 100644 --- a/helloworld/notebooks/OpHousingPrices.ipynb +++ b/helloworld/notebooks/OpHousingPrices.ipynb @@ -16,7 +16,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.6.1" + "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.7.0" ] }, { diff --git a/helloworld/notebooks/OpIris.ipynb b/helloworld/notebooks/OpIris.ipynb index 4ae4bbf940..3e2fecd3d1 100644 --- a/helloworld/notebooks/OpIris.ipynb +++ b/helloworld/notebooks/OpIris.ipynb @@ -17,7 +17,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.6.1" + "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.7.0" ] }, { @@ -26,7 +26,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.3.2" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" ] }, { diff --git a/helloworld/notebooks/OpTitanicSimple.ipynb b/helloworld/notebooks/OpTitanicSimple.ipynb index ffc9a3236b..d1a82d3f1c 100644 --- a/helloworld/notebooks/OpTitanicSimple.ipynb +++ b/helloworld/notebooks/OpTitanicSimple.ipynb @@ -22,7 +22,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.6.1" + "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.7.0" ] }, { @@ -31,7 +31,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.3.2" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" ] }, { diff --git a/pom.xml b/pom.xml index 59cc8a4799..401b01e0b2 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.salesforce.transmogrifai TransmogrifAI - 0.6.1 + 0.7.0 TransmogrifAI AutoML library for building modular, reusable, strongly typed machine learning workflows on Spark with minimal hand tuning https://github.com/salesforce/TransmogrifAI @@ -108,7 +108,7 @@ ml.dmlc xgboost4j-spark - 0.81 + 0.90 compile @@ -164,21 +164,9 @@ org.json4s json4s-ext_2.11 - 3.2.11 + 3.5.3 compile - - com.databricks - spark-avro_2.11 - 4.0.0 - compile - - - avro - org.apache.avro - - - com.fasterxml.jackson.dataformat jackson-dataformat-yaml @@ -200,7 +188,7 @@ com.twitter chill-avro_2.11 - 0.8.4 + 0.9.3 compile @@ -212,7 +200,7 @@ com.twitter chill-algebird_2.11 - 0.8.4 + 0.9.3 compile @@ -254,13 +242,13 @@ org.apache.spark spark-sql_2.11 - 2.3.2 + 2.4.4 compile org.apache.avro avro - 1.7.7 + 1.8.2 compile @@ -272,13 +260,13 @@ ml.combust.mleap mleap-spark_2.11 - 0.13.0 + 0.14.0 compile ml.combust.mleap mleap-runtime_2.11 - 0.13.0 + 0.14.0 compile diff --git a/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala b/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala index 031703e900..ab56e16cfb 100644 --- a/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala +++ b/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala @@ -30,10 +30,9 @@ package com.salesforce.op.readers -import com.databricks.spark.avro.SchemaConverters +import org.apache.spark.sql.avro.SchemaConverters import com.salesforce.op.OpParams import com.salesforce.op.utils.io.csv.{CSVInOut, CSVOptions, CSVToAvro} -import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecord import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.csv.CSVSchemaUtils @@ -74,10 +73,11 @@ class CSVAutoReader[T <: GenericRecord : ClassTag] val hdrsSet = hdrs.toSet val data = csvData.filter(_.exists(!hdrsSet.contains(_))) - val inferredSchema = CSVSchemaUtils.infer(data.map(_.toArray), hdrs, options) - val builder = SchemaBuilder.record(recordName).namespace(recordNamespace) - val schema = SchemaConverters.convertStructToAvro(inferredSchema, builder, recordNamespace) - + val columnPrunning = spark.sessionState.conf.csvColumnPruning + val inferredSchema = CSVSchemaUtils.infer(data.map(_.toArray), hdrs, options, columnPrunning) + val schema = SchemaConverters.toAvroType( + inferredSchema, nullable = false, recordName = recordName, nameSpace = recordNamespace + ) val avroData: RDD[T] = CSVToAvro.toAvroTyped[T](data, schema.toString, timeZone) maybeRepartition(avroData, params) } diff --git a/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala b/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala index 8a633b9739..6d8b4a9593 100644 --- a/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala +++ b/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala @@ -43,18 +43,22 @@ case object CSVSchemaUtils { * 2. Merge row types to find common type * 3. Replace any null types with string type * - * @param rdd data - * @param header CSV header - * @param options CSV options + * @param rdd data + * @param header CSV header + * @param options CSV options + * @param columnPruning If it is set to true, column names of the requested schema are passed to CSV parser. + * Other column values can be ignored during parsing even if they are malformed. * @return inferred schema */ def infer( rdd: RDD[Array[String]], header: Seq[String], - options: com.salesforce.op.utils.io.csv.CSVOptions + options: com.salesforce.op.utils.io.csv.CSVOptions, + columnPruning: Boolean = true ): StructType = { val opts = new org.apache.spark.sql.execution.datasources.csv.CSVOptions( parameters = options.copy(header = false).toSparkCSVOptionsMap + ("inferSchema" -> true.toString), + columnPruning = columnPruning, defaultTimeZoneId = "GMT" ) CSVInferSchema.infer(rdd, header.toArray, opts) diff --git a/templates/simple/build.gradle.template b/templates/simple/build.gradle.template index a9bb81153d..fd70005fdd 100644 --- a/templates/simple/build.gradle.template +++ b/templates/simple/build.gradle.template @@ -1,15 +1,16 @@ buildscript { repositories { mavenCentral() + jcenter() maven { url "https://plugins.gradle.org/m2/" } } - //dependencies { + dependencies { + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' // classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' - //} + } } plugins { - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'com.github.johnrengelman.shadow' version '5.0.0' } @@ -79,8 +80,7 @@ dependencies { testCompile("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } // Spark Avro - compile ("com.databricks:spark-avro_$scalaVersion:$sparkAvroVersion") { exclude group: "org.apache.avro", module: "avro" } - + compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" } configurations.all { diff --git a/utils/build.gradle b/utils/build.gradle index f18024ec72..3915d616b6 100644 --- a/utils/build.gradle +++ b/utils/build.gradle @@ -7,7 +7,7 @@ dependencies { testCompile("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } // Spark Avro - compile ("com.databricks:spark-avro_$scalaVersion:$sparkAvroVersion") { exclude group: "org.apache.avro", module: "avro" } + compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" // Jackson Yaml compile ("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion") { exclude group: "com.fasterxml.jackson.core" } diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala b/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala index e02aaf6a3b..49ac6a07c5 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala @@ -95,7 +95,7 @@ object CSVToAvro { val value = if (index < columns.size) columns(index) else try { - field.defaultValue().asText() + field.defaultVal().toString } catch { case e: Exception => throw new InvalidParameterException("Mismatch number of fields in csv record and avro schema") diff --git a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala index dcf04ee337..1dfcb0b898 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala @@ -31,19 +31,22 @@ package com.salesforce.op.utils.json import org.json4s.CustomSerializer -import org.json4s.JsonAST.JString +import org.json4s.JsonAST.{JDouble, JString, JDecimal} /** * Json4s serializer for marshalling special Double values: NaN, -Infinity and Infinity */ // scalastyle:off -class SpecialDoubleSerializer extends CustomSerializer[Double](_ => +class SpecialDoubleSerializer extends CustomSerializer[Double](ser => ({ case JString("NaN") => Double.NaN case JString("-Infinity") => Double.NegativeInfinity case JString("Infinity") => Double.PositiveInfinity + case JDouble(v) => v }, { case v: Double if v.isNaN => JString("NaN") case Double.NegativeInfinity => JString("-Infinity") case Double.PositiveInfinity => JString("Infinity") + case v: Double if ser.wantsBigDecimal => JDecimal(v) + case v: Double => JDouble(v) })) diff --git a/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala b/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala index 87ed5548ef..9138b04a12 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala @@ -32,7 +32,7 @@ package com.salesforce.op.utils.json import com.salesforce.op.test.TestCommon import org.json4s.jackson.JsonMethods._ -import org.json4s.{DefaultFormats, Extraction} +import org.json4s.{DefaultFormats, Extraction, Formats} import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @@ -41,8 +41,6 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class SpecialDoubleSerializerTest extends FlatSpec with TestCommon { - implicit val formats = DefaultFormats + new SpecialDoubleSerializer - val data = Map( "normal" -> Seq(-1.1, 0.0, 2.3), "infs" -> Seq(Double.NegativeInfinity, Double.PositiveInfinity), @@ -50,18 +48,28 @@ class SpecialDoubleSerializerTest extends FlatSpec with TestCommon { "nan" -> Seq(Double.NaN) ) - val dataJson = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E308,1.7976931348623157E308],"nan":["NaN"]}""" // scalastyle:off + Spec[SpecialDoubleSerializer] should behave like + readWriteDoubleValues(data)( + json = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E308,1.7976931348623157E308],"nan":["NaN"]}""" // scalastyle:off + )(DefaultFormats + new SpecialDoubleSerializer) - Spec[SpecialDoubleSerializer] should "write double entries" in { - compact(Extraction.decompose(data)) shouldBe dataJson - } - it should "read double entries" in { - val parsed = parse(dataJson).extract[Map[String, Seq[Double]]] - parsed.keys shouldBe data.keys + Spec[SpecialDoubleSerializer] + " (with big decimal)" should behave like + readWriteDoubleValues(data)( + json = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E+308,1.7976931348623157E+308],"nan":["NaN"]}""" // scalastyle:off + )(DefaultFormats.withBigDecimal + new SpecialDoubleSerializer) - parsed zip data foreach { - case (("nan", a), ("nan", b)) => a.foreach(_.isNaN shouldBe true) - case ((_, a), (_, b)) => a should contain theSameElementsAs b + + def readWriteDoubleValues(input: Map[String, Seq[Double]])(json: String)(implicit formats: Formats): Unit = { + it should "write double entries" in { + compact(Extraction.decompose(input)) shouldBe json + } + it should "read double entries" in { + val parsed = parse(json).extract[Map[String, Seq[Double]]] + parsed.keys shouldBe input.keys + parsed zip input foreach { + case (("nan", a), ("nan", b)) => a.foreach(_.isNaN shouldBe true) + case ((_, a), (_, b)) => a should contain theSameElementsAs b + } } } }