From 77e2229dd40cfd805a98f05595706229aa821c2d Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Tue, 3 Sep 2019 22:10:38 -0700 Subject: [PATCH 01/45] Revert "Revert back to Spark 2.3 (#399)" This reverts commit 95a77b17269a71bf0d53c54df7d76f0bfe862275. --- README.md | 2 +- build.gradle | 22 +++---- cli/build.gradle | 6 +- .../com/salesforce/op/cli/SchemaSource.scala | 2 +- .../salesforce/op/cli/gen/AvroFieldTest.scala | 4 +- .../impl/classification/OpLinearSVC.scala | 4 +- .../classification/OpXGBoostClassifier.scala | 26 +++++++- .../regression/OpDecisionTreeRegressor.scala | 6 +- .../impl/regression/OpGBTRegressor.scala | 6 +- .../impl/regression/OpLinearRegression.scala | 6 +- .../regression/OpRandomForestRegressor.scala | 7 +- .../impl/regression/OpXGBoostRegressor.scala | 26 ++++++-- .../stages/impl/selector/ModelSelector.scala | 1 - .../specific/OpPredictionModel.scala | 7 +- .../xgboost4j/scala/spark/XGBoostParams.scala | 6 +- .../com/salesforce/op/ModelInsightsTest.scala | 9 ++- .../OpPipelineStageReaderWriterTest.scala | 15 +++-- .../OpClassifierModelTest.scala | 15 ++--- ...DecisionTreeNumericMapBucketizerTest.scala | 4 +- .../op/stages/OpPipelineStageReader.scala | 7 +- .../stages/OpPipelineStageReaderWriter.scala | 3 + .../op/utils/spark/RichDataset.scala | 33 +++++++++- .../ml/SparkDefaultParamsReadWrite.scala | 65 ++++++++++++++----- pom.xml | 24 ++----- .../op/readers/CSVAutoReaders.scala | 12 ++-- .../datasources/csv/CSVSchemaUtils.scala | 12 ++-- templates/simple/build.gradle.template | 10 +-- utils/build.gradle | 2 +- .../op/utils/io/csv/CSVToAvro.scala | 2 +- .../utils/json/SpecialDoubleSerializer.scala | 7 +- .../json/SpecialDoubleSerializerTest.scala | 34 ++++++---- 31 files changed, 237 insertions(+), 148 deletions(-) diff --git a/README.md b/README.md index c81f2e9be0..9f9b680988 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ Start by picking TransmogrifAI version to match your project dependencies from t | TransmogrifAI Version | Spark Version | Scala Version | Java Version | |-------------------------------------------------|:-------------:|:-------------:|:------------:| -| 0.6.1 (unreleased, master) | 2.3 | 2.11 | 1.8 | +| 0.6.1 (unreleased, master) | 2.4 | 2.11 | 1.8 | | **0.6.0 (stable)**, 0.5.3, 0.5.2, 0.5.1, 0.5.0 | **2.3** | **2.11** | **1.8** | | 0.4.0, 0.3.4 | 2.2 | 2.11 | 1.8 | diff --git a/build.gradle b/build.gradle index f1ec9672ae..06dc54f80c 100644 --- a/build.gradle +++ b/build.gradle @@ -1,15 +1,16 @@ buildscript { repositories { - maven { url "https://plugins.gradle.org/m2/" } mavenCentral() + jcenter() + maven { url "https://plugins.gradle.org/m2/" } } dependencies { classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' } } plugins { - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'org.scoverage' version '2.5.0' id 'net.minecrell.licenser' version '0.4.1' id 'com.github.jk1.dependency-license-report' version '0.5.0' @@ -57,14 +58,13 @@ configure(allProjs) { scalaVersionRevision = '12' scalaTestVersion = '3.0.5' scalaCheckVersion = '1.14.0' - junitVersion = '4.11' - avroVersion = '1.7.7' - sparkVersion = '2.3.2' - sparkAvroVersion = '4.0.0' + junitVersion = '4.12' + avroVersion = '1.8.2' + sparkVersion = '2.4.3' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' hadoopVersion = 'hadoop2' - json4sVersion = '3.2.11' // matches Spark dependency version + json4sVersion = '3.5.3' // matches Spark dependency version jodaTimeVersion = '2.9.4' jodaConvertVersion = '1.8.1' algebirdVersion = '0.13.4' @@ -75,20 +75,20 @@ configure(allProjs) { googleLibPhoneNumberVersion = '8.8.5' googleGeoCoderVersion = '2.82' googleCarrierVersion = '1.72' - chillVersion = '0.8.4' + chillVersion = '0.9.3' reflectionsVersion = '0.9.11' collectionsVersion = '3.2.2' optimaizeLangDetectorVersion = '0.0.1' tikaVersion = '1.22' - sparkTestingBaseVersion = '2.3.1_0.10.0' + sparkTestingBaseVersion = '2.4.3_0.12.0' sourceCodeVersion = '0.1.3' pegdownVersion = '1.4.2' commonsValidatorVersion = '1.6' commonsIOVersion = '2.6' scoveragePluginVersion = '1.3.1' - xgboostVersion = '0.81' + xgboostVersion = '0.90' akkaSlf4jVersion = '2.3.11' - mleapVersion = '0.13.0' + mleapVersion = '0.14.0' memoryFilesystemVersion = '2.1.0' } diff --git a/cli/build.gradle b/cli/build.gradle index 3062917744..3d6e9ffd9c 100644 --- a/cli/build.gradle +++ b/cli/build.gradle @@ -69,7 +69,6 @@ task copyTemplates(type: Copy) { fileName.replace(".gradle.template", ".gradle") } expand([ - databaseHostname: 'db.company.com', version: scalaVersion, scalaVersion: scalaVersion, scalaVersionRevision: scalaVersionRevision, @@ -77,12 +76,9 @@ task copyTemplates(type: Copy) { junitVersion: junitVersion, sparkVersion: sparkVersion, avroVersion: avroVersion, - sparkAvroVersion: sparkAvroVersion, hadoopVersion: hadoopVersion, collectionsVersion: collectionsVersion, - transmogrifaiVersion: version, - buildNumber: (int)(Math.random() * 1000), - date: new Date() + transmogrifaiVersion: version ]) } diff --git a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala index ee863c8a81..5ad27f866b 100644 --- a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala +++ b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala @@ -138,7 +138,7 @@ case class AutomaticSchema(recordClassName: String)(dataFile: File) extends Sche case Some(actualType) => val newSchema = Schema.create(actualType) val schemaField = - new Schema.Field(field.name, newSchema, "auto-generated", orgSchemaField.defaultValue) + new Schema.Field(field.name, newSchema, "auto-generated", orgSchemaField.defaultVal()) AvroField.from(schemaField) } } else field diff --git a/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala b/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala index 80bce98cab..38a686a711 100644 --- a/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala +++ b/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala @@ -69,7 +69,7 @@ class AvroFieldTest extends FlatSpec with TestCommon with Assertions { val allSchemas = (enum::unions)++simpleSchemas // NULL does not work val fields = allSchemas.zipWithIndex map { - case (s, i) => new Schema.Field("x" + i, s, "Who", null) + case (s, i) => new Schema.Field("x" + i, s, "Who", null: Object) } val expected = List( @@ -86,7 +86,7 @@ class AvroFieldTest extends FlatSpec with TestCommon with Assertions { an[IllegalArgumentException] should be thrownBy { val nullSchema = Schema.create(Schema.Type.NULL) - val nullField = new Schema.Field("xxx", null, "Nobody", null) + val nullField = new Schema.Field("xxx", null, "Nobody", null: Object) AvroField from nullField } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala index 425d43a866..1275e3d163 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala @@ -152,14 +152,14 @@ class OpLinearSVCModel ) extends OpPredictorWrapperModel[LinearSVCModel](uid = uid, operationName = operationName, sparkModel = sparkModel) { @transient lazy private val predictRaw = reflectMethod(getSparkMlStage().get, "predictRaw") - @transient lazy private val predict = reflectMethod(getSparkMlStage().get, "predict") + @transient lazy private val predict: Vector => Double = getSparkMlStage().get.predict(_) /** * Function used to convert input to output */ override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { val raw = predictRaw(features.value).asInstanceOf[Vector] - val pred = predict(features.value).asInstanceOf[Double] + val pred = predict(features.value) Prediction(rawPrediction = raw, prediction = pred) } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala index 30f9801fb4..3c07a58787 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala @@ -235,6 +235,11 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) */ def setMaxBins(value: Int): this.type = set(maxBins, value) + /** + * Maximum number of nodes to be added. Only relevant when grow_policy=lossguide is set. + */ + def setMaxLeaves(value: Int): this.type = set(maxLeaves, value) + /** * This is only used for approximate greedy algorithm. * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select @@ -282,8 +287,19 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) def setLambdaBias(value: Double): this.type = set(lambdaBias, value) // setters for learning params + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:squarederror, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:squarederror + */ def setObjective(value: String): this.type = set(objective, value) + /** + * Objective type used for training. For options see [[ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams]] + */ + def setObjectiveType(value: String): this.type = set(objectiveType, value) + /** * Specify the learning task and the corresponding learning objective. * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, @@ -310,6 +326,11 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) */ def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + /** + * Define the expected optimization to the evaluation metrics, true to maximize otherwise minimize it + */ + def setMaximizeEvaluationMetrics(value: Boolean): this.type = set(maximizeEvaluationMetrics, value) + /** * Customized objective function provided by user. default: null */ @@ -359,17 +380,18 @@ class OpXGBoostClassificationModel private lazy val model = getSparkMlStage().get private lazy val booster = model.nativeBooster - private lazy val treeLimit = model.getTreeLimit.toInt + private lazy val treeLimit = model.getTreeLimit private lazy val missing = model.getMissing override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { - val data = removeMissingValues(Iterator(features.value.asXGB), missing) + val data = processMissingValues(Iterator(features.value.asXGB), missing) val dm = new DMatrix(dataIter = data) val rawPred = booster.predict(dm, outPutMargin = true, treeLimit = treeLimit)(0).map(_.toDouble) val rawPrediction = if (model.numClasses == 2) Array(-rawPred(0), rawPred(0)) else rawPred val prob = booster.predict(dm, outPutMargin = false, treeLimit = treeLimit)(0).map(_.toDouble) val probability = if (model.numClasses == 2) Array(1.0 - prob(0), prob(0)) else prob val prediction = probability2predictionMirror(Vectors.dense(probability)).asInstanceOf[Double] + Prediction(prediction = prediction, rawPrediction = rawPrediction, probability = probability) } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala index f42e1e50ed..39a5735949 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor, OpDecisionTreeRegressorParams} import scala.reflect.runtime.universe.TypeTag @@ -113,7 +112,4 @@ class OpDecisionTreeRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[DecisionTreeRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala index a8d69c9f14..b5717b49a4 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor, OpGBTRegressorParams} import scala.reflect.runtime.universe.TypeTag @@ -139,7 +138,4 @@ class OpGBTRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[GBTRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala index 780a496b60..e0da705c9d 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala @@ -34,7 +34,6 @@ import com.salesforce.op._ import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel, OpLinearRegressionParams} import scala.reflect.runtime.universe.TypeTag @@ -180,7 +179,4 @@ class OpLinearRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[LinearRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala index 4b1aca8265..4b0fdbd1d5 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{OpRandomForestRegressorParams, RandomForestRegressionModel, RandomForestRegressor} import scala.reflect.runtime.universe.TypeTag @@ -126,8 +125,4 @@ class OpRandomForestRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[RandomForestRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala index 688f34f812..8e2eaaf49d 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait} import ml.dmlc.xgboost4j.scala.spark.{OpXGBoostRegressorParams, TrackerConf, XGBoostRegressionModel, XGBoostRegressor} @@ -234,6 +233,11 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) */ def setMaxBins(value: Int): this.type = set(maxBins, value) + /** + * Maximum number of nodes to be added. Only relevant when grow_policy=lossguide is set. + */ + def setMaxLeaves(value: Int): this.type = set(maxLeaves, value) + /** * This is only used for approximate greedy algorithm. * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select @@ -281,8 +285,19 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) def setLambdaBias(value: Double): this.type = set(lambdaBias, value) // setters for learning params + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:squarederror, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:squarederror + */ def setObjective(value: String): this.type = set(objective, value) + /** + * Objective type used for training. For options see [[ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams]] + */ + def setObjectiveType(value: String): this.type = set(objectiveType, value) + /** * Specify the learning task and the corresponding learning objective. * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, @@ -309,6 +324,11 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) */ def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + /** + * Define the expected optimization to the evaluation metrics, true to maximize otherwise minimize it + */ + def setMaximizeEvaluationMetrics(value: Boolean): this.type = set(maximizeEvaluationMetrics, value) + /** * Customized objective function provided by user. default: null */ @@ -341,6 +361,4 @@ class OpXGBoostRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[XGBoostRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala index a7e5dbac68..f32aae608e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelector.scala @@ -41,7 +41,6 @@ import com.salesforce.op.stages.impl.tuning.{BestEstimator, _} import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapperModel, SparkModelConverter} import com.salesforce.op.utils.spark.RichMetadata._ -import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichParamMap._ import com.salesforce.op.utils.stages.FitStagesUtil._ import org.apache.spark.ml.param._ diff --git a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala index cfcaae7278..bc59b13ba8 100644 --- a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala +++ b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala @@ -52,9 +52,10 @@ abstract class OpPredictionModel[T <: PredictionModel[Vector, T]] operationName: String ) extends OpPredictorWrapperModel[T](uid = uid, operationName = operationName, sparkModel = sparkModel) { - protected def predictMirror: MethodMirror - - protected def predict(features: Vector): Double = predictMirror.apply(features).asInstanceOf[Double] + /** + * Predict label for the given features + */ + @transient protected lazy val predict: Vector => Double = getSparkMlStage().get.predict(_) /** * Function used to convert input to output diff --git a/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala index 88077fa63f..3c8959997d 100644 --- a/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala +++ b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala @@ -104,8 +104,8 @@ case object OpXGBoost { } /** - * Hack to access [[ml.dmlc.xgboost4j.scala.spark.XGBoost.removeMissingValues]] private method + * Hack to access [[ml.dmlc.xgboost4j.scala.spark.XGBoost.processMissingValues]] private method */ - def removeMissingValues(xgbLabelPoints: Iterator[LabeledPoint], missing: Float): Iterator[LabeledPoint] = - XGBoost.removeMissingValues(xgbLabelPoints, missing) + def processMissingValues(xgbLabelPoints: Iterator[LabeledPoint], missing: Float): Iterator[LabeledPoint] = + XGBoost.processMissingValues(xgbLabelPoints, missing) } diff --git a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala index e610efd916..40501348bf 100644 --- a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala +++ b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala @@ -44,6 +44,7 @@ import com.salesforce.op.stages.impl.tuning.{DataCutter, DataSplitter} import com.salesforce.op.test.{PassengerSparkFixtureTest, TestFeatureBuilder} import com.salesforce.op.testkit.RandomReal import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} +import ml.dmlc.xgboost4j.scala.spark.OpXGBoostQuietLogging import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder import org.junit.runner.RunWith @@ -51,7 +52,6 @@ import com.salesforce.op.features.types.Real import com.salesforce.op.stages.impl.feature.{CombinationStrategy, TextStats} import com.twitter.algebird.Moments import org.apache.spark.sql.{DataFrame, Dataset} -import org.scalactic.Equality import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import org.apache.spark.sql.functions._ @@ -59,7 +59,7 @@ import org.apache.spark.sql.functions._ import scala.util.{Failure, Success} @RunWith(classOf[JUnitRunner]) -class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with DoubleEquality { +class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with DoubleEquality with OpXGBoostQuietLogging { private val density = weight / height private val generVec = genderPL.vectorize(topK = 10, minSupport = 1, cleanText = true) @@ -76,8 +76,8 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou val lrParams = new ParamGridBuilder().addGrid(lr.regParam, Array(0.01, 0.1)).build() val models = Seq(lr -> lrParams).asInstanceOf[Seq[(EstimatorType, Array[ParamMap])]] - val xgbClassifier = new OpXGBoostClassifier().setSilent(1).setSeed(42L) - val xgbRegressor = new OpXGBoostRegressor().setSilent(1).setSeed(42L) + val xgbClassifier = new OpXGBoostClassifier().setMissing(0.0f).setSilent(1).setSeed(42L) + val xgbRegressor = new OpXGBoostRegressor().setMissing(0.0f).setSilent(1).setSeed(42L) val xgbClassifierPred = xgbClassifier.setInput(label, features).getOutput() val xgbRegressorPred = xgbRegressor.setInput(label, features).getOutput() lazy val xgbWorkflow = @@ -447,7 +447,6 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou } it should "correctly serialize and deserialize from json when raw feature filter is used" in { - val insights = modelWithRFF.modelInsights(predWithMaps) ModelInsights.fromJson(insights.toJson()) match { case Failure(e) => fail(e) diff --git a/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala b/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala index 8791277d9e..05e6cdfcfa 100644 --- a/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala @@ -70,8 +70,8 @@ private[stages] abstract class OpPipelineStageReaderWriterTest it should "write class name" in { (stageJson \ FN.Class.entryName).extract[String] shouldBe stage.getClass.getName } - it should "write paramMap" in { - val params = (stageJson \ FN.ParamMap.entryName).extract[Map[String, Any]] + it should "write params map" in { + val params = extractParams(stageJson).extract[Map[String, Any]] if (hasOutputName) { params should have size 4 params.keys shouldBe Set("inputFeatures", "outputMetadata", "inputSchema", "outputFeatureName") @@ -81,12 +81,13 @@ private[stages] abstract class OpPipelineStageReaderWriterTest } } it should "write outputMetadata" in { - val metadataStr = compact(render((stageJson \ FN.ParamMap.entryName) \ "outputMetadata")) + val params = extractParams(stageJson) + val metadataStr = compact(render(extractParams(stageJson) \ "outputMetadata")) val metadata = Metadata.fromJson(metadataStr) metadata shouldBe stage.getMetadata() } it should "write inputSchema" in { - val schemaStr = compact(render((stageJson \ FN.ParamMap.entryName) \ "inputSchema")) + val schemaStr = compact(render(extractParams(stageJson) \ "inputSchema")) val schema = DataType.fromJson(schemaStr) schema shouldBe stage.getInputSchema() } @@ -118,4 +119,10 @@ private[stages] abstract class OpPipelineStageReaderWriterTest stageLoaded.getInputSchema() shouldBe stage.getInputSchema() } + private def extractParams(stageJson: JValue): JValue = { + val defaultParamsMap = stageJson \ FN.DefaultParamMap.entryName + val paramsMap = stageJson \ FN.ParamMap.entryName + defaultParamsMap.merge(paramsMap) + } + } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala index ab8168cd6c..759d542b18 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala @@ -134,27 +134,22 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext with OpXGBoos .setLabelCol(labelF.name) val spk = cl.fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF), false) + compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } - def compareOutputs - ( - df1: DataFrame, - df2: DataFrame, - fullRawPred: Boolean = true - )(implicit arrayEquality: Equality[Array[Double]]): Unit = { + def compareOutputs(df1: DataFrame, df2: DataFrame)(implicit arrayEquality: Equality[Array[Double]]): Unit = { def keysStartsWith(name: String, value: Map[String, Double]): Array[Double] = { val names = value.keys.filter(_.startsWith(name)).toArray.sorted names.map(value) } + val sorted1 = df1.collect().sortBy(_.getAs[Double](4)) val sorted2 = df2.collect().sortBy(_.getAs[Map[String, Double]](2)(Prediction.Keys.PredictionName)) - sorted1.zip(sorted2).foreach{ case (r1, r2) => + sorted1.zip(sorted2).foreach { case (r1, r2) => val map = r2.getAs[Map[String, Double]](2) r1.getAs[Double](4) shouldEqual map(Prediction.Keys.PredictionName) r1.getAs[Vector](3).toArray shouldEqual keysStartsWith(Prediction.Keys.ProbabilityName, map) - if (fullRawPred) r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map) - else r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map).tail + r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map) } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala index 6b4fc3105b..10e368c7c9 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala @@ -53,7 +53,7 @@ class DecisionTreeNumericMapBucketizerTest extends OpEstimatorSpec[OPVector, val (inputData, estimator) = { val numericData = Seq( - Map("a" -> 1.0), + Map("a" -> 1.0, "b" -> 1.0), Map("a" -> 18.0), Map("b" -> 0.0), Map("a" -> -1.23, "b" -> 1.0), @@ -66,7 +66,7 @@ class DecisionTreeNumericMapBucketizerTest extends OpEstimatorSpec[OPVector, } val expectedResult = Seq( - Vectors.sparse(7, Array(1, 5, 6), Array(1.0, 1.0, 1.0)), + Vectors.sparse(7, Array(1, 4, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(1, 5, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(2, 3, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(0, 4, 6), Array(1.0, 1.0, 1.0)), diff --git a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala index 8e0eea4087..4b1a7d1e87 100644 --- a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala +++ b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala @@ -122,9 +122,10 @@ final class OpPipelineStageReader private // Update [[SparkWrapperParams]] with path so we can load the [[SparkStageParam]] instance val updatedMetadata = stage match { - case _: SparkWrapperParams[_] => - val updatedParams = SparkStageParam.updateParamsMetadataWithPath(metadata.params, path) - metadata.copy(params = updatedParams) + case _: SparkWrapperParams[_] => metadata.copy( + params = SparkStageParam.updateParamsMetadataWithPath(metadata.params, path), + defaultParams = SparkStageParam.updateParamsMetadataWithPath(metadata.defaultParams, path) + ) case _ => metadata } diff --git a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala index ef63f5d50c..aec70ef9df 100644 --- a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala +++ b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala @@ -171,6 +171,9 @@ object OpPipelineStageReaderWriter extends OpPipelineStageReadWriteFormats { case object Uid extends FieldNames("uid") case object Class extends FieldNames("class") case object ParamMap extends FieldNames("paramMap") + case object DefaultParamMap extends FieldNames("defaultParamMap") + case object Timestamp extends FieldNames("timestamp") + case object SparkVersion extends FieldNames("sparkVersion") } /** diff --git a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala index 27ea60e30a..703b61414f 100644 --- a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala +++ b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala @@ -33,13 +33,13 @@ package com.salesforce.op.utils.spark import com.salesforce.op.features.types._ import com.salesforce.op.features.{FeatureLike, FeatureSparkTypes, OPFeature} import com.salesforce.op.utils.text.TextUtils +import org.apache.avro.mapred.AvroInputFormat import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} -import com.databricks.spark.avro._ import org.apache.spark.ml.linalg.{Vector, Vectors} -import scala.collection.mutable.{WrappedArray => MWrappedArray} +import scala.collection.mutable.{WrappedArray => MWrappedArray} import scala.reflect.ClassTag @@ -60,6 +60,34 @@ object RichDataset { private[op] def schemaPath(path: String): String = s"${path.stripSuffix("/")}/schema" private[op] def dataPath(path: String): String = s"${path.stripSuffix("/")}/data" + private val AvroFormat = "avro" + + implicit class RichDataFrameWriter[T](w: DataFrameWriter[T]) { + + /** + * Saves the content of the `DataFrame` in Avro format at the specified path. + * This is equivalent to: + * {{{ + * format("avro").save(path) + * }}} + */ + def avro(path: String): Unit = w.format(AvroFormat).save(path) + + } + + implicit class RichDataFrameReader(r: DataFrameReader) { + + /** + * Loads Avro files and returns the result as a `DataFrame`. + * This is equivalent to: + * {{{ + * format("avro").load(path) + * }}} + */ + def avro(path: String): DataFrame = r.format(AvroFormat).load(path) + + } + /** * Loads a dataframe from a saved Avro file and dataframe schema file generated by RichDataFrame.saveAvro. * Relies on spark-avro package for Avro file generation, which seems to have a bug/feature that makes all fields @@ -84,7 +112,6 @@ object RichDataset { data.select(columns: _*) } - /** * A dataframe with three quantifiers: forall, exists, and forNone (see below) * the rest of extended functionality comes from RichDataset diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index 8179b2b403..3d697588ba 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -21,6 +21,7 @@ package org.apache.spark.ml import com.salesforce.op.stages.OpPipelineStageBase +import com.salesforce.op.stages.OpPipelineStageReaderWriter.FieldNames._ import org.apache.spark.ml.param.ParamPair import org.apache.spark.ml.util.{DefaultParamsReader, DefaultParamsWriter} import org.json4s.JsonDSL._ @@ -47,21 +48,26 @@ case object SparkDefaultParamsReadWrite { * @see [[OpPipelineStageWriter]] for details on what this includes. */ def getMetadataToSave( - stage: OpPipelineStageBase, + instance: OpPipelineStageBase, extraMetadata: Option[JObject] = None, paramMap: Option[JValue] = None ): JObject = { - val uid = stage.uid - val cls = stage.getClass.getName - val params = stage.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] + val uid = instance.uid + val cls = instance.getClass.getName + val params = instance.paramMap.toSeq + val defaultParams = instance.defaultParamMap.toSeq val jsonParams = paramMap.getOrElse(render(params.map { case ParamPair(p, v) => p.name -> parse(p.jsonEncode(v)) }.toList)) - val basicMetadata = ("class" -> cls) ~ - ("timestamp" -> System.currentTimeMillis()) ~ - ("sparkVersion" -> org.apache.spark.SPARK_VERSION) ~ - ("uid" -> uid) ~ - ("paramMap" -> jsonParams) + val jsonDefaultParams = render(defaultParams.map { case ParamPair(p, v) => + p.name -> parse(p.jsonEncode(v)) + }.toList) + val basicMetadata = (Class.entryName -> cls) ~ + (Timestamp.entryName -> System.currentTimeMillis()) ~ + (SparkVersion.entryName -> org.apache.spark.SPARK_VERSION) ~ + (Uid.entryName -> uid) ~ + (ParamMap.entryName -> jsonParams) ~ + (DefaultParamMap.entryName -> jsonDefaultParams) val metadata = extraMetadata match { case Some(jObject) => basicMetadata ~ jObject @@ -75,19 +81,48 @@ case object SparkDefaultParamsReadWrite { * Parse metadata JSON string produced by [[DefaultParamsWriter.getMetadataToSave()]]. * This is a helper function for [[loadMetadata()]]. * + * Note: this method was taken from [[DefaultParamsWriter.parseMetadata]], + * but modified to avoid failing on loading of Spark models prior to 2.4.x + * * @param metadataStr JSON string of metadata * @param expectedClassName If non empty, this is checked against the loaded metadata. * @throws IllegalArgumentException if expectedClassName is specified and does not match metadata */ - def parseMetadata(jsonStr: String): Metadata = - DefaultParamsReader.parseMetadata(jsonStr) + def parseMetadata(metadataStr: String, expectedClassName: String = ""): Metadata = { + val metadata = parse(metadataStr) + + val className = (metadata \ Class.entryName).extract[String] + val uid = (metadata \ Uid.entryName).extract[String] + val timestamp = (metadata \ Timestamp.entryName).extract[Long] + val sparkVersion = (metadata \ SparkVersion.entryName).extract[String] + val params = metadata \ ParamMap.entryName + val defaultParams = metadata \ DefaultParamMap.entryName + if (expectedClassName.nonEmpty) { + require(className == expectedClassName, s"Error loading metadata: Expected class name" + + s" $expectedClassName but found class name $className") + } + // ****************************************************************************************** + /* + * Backward compatible fix for models trained with older versions of Spark (prior to 2.4.x). + * The change introduced in https://github.com/apache/spark/pull/20633 added serialization of + * default params, older models won't have them and fail to load. + */ + val defaultParamsFix = if (defaultParams == JNothing) JObject() else defaultParams + // ****************************************************************************************** + + new Metadata(className, uid, timestamp, sparkVersion, params, defaultParamsFix, metadata, metadataStr) + } /** * Extract Params from metadata, and set them in the instance. - * This works if all Params implement [[org.apache.spark.ml.param.Param.jsonDecode()]]. - * TODO: Move to [[Metadata]] method + * This works if all Params (except params included by `skipParams` list) implement + * [[org.apache.spark.ml.param.Param.jsonDecode()]]. + * + * @param skipParams The params included in `skipParams` won't be set. This is useful if some + * params don't implement [[org.apache.spark.ml.param.Param.jsonDecode()]] + * and need special handling. */ - def getAndSetParams(stage: OpPipelineStageBase, metadata: Metadata): Unit = - DefaultParamsReader.getAndSetParams(stage, metadata) + def getAndSetParams(stage: OpPipelineStageBase, metadata: Metadata, skipParams: Option[List[String]] = None): Unit = + metadata.getAndSetParams(stage, skipParams) } diff --git a/pom.xml b/pom.xml index 5fa90f9ffb..cd6dfff170 100644 --- a/pom.xml +++ b/pom.xml @@ -108,7 +108,7 @@ ml.dmlc xgboost4j-spark - 0.81 + 0.90 compile @@ -167,18 +167,6 @@ 3.2.11 compile - - com.databricks - spark-avro_2.11 - 4.0.0 - compile - - - avro - org.apache.avro - - - com.fasterxml.jackson.dataformat jackson-dataformat-yaml @@ -200,7 +188,7 @@ com.twitter chill-avro_2.11 - 0.8.4 + 0.9.3 compile @@ -212,7 +200,7 @@ com.twitter chill-algebird_2.11 - 0.8.4 + 0.9.3 compile @@ -260,7 +248,7 @@ org.apache.avro avro - 1.7.7 + 1.8.2 compile @@ -272,13 +260,13 @@ ml.combust.mleap mleap-spark_2.11 - 0.13.0 + 0.14.0 compile ml.combust.mleap mleap-runtime_2.11 - 0.13.0 + 0.14.0 compile diff --git a/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala b/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala index 031703e900..ab56e16cfb 100644 --- a/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala +++ b/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala @@ -30,10 +30,9 @@ package com.salesforce.op.readers -import com.databricks.spark.avro.SchemaConverters +import org.apache.spark.sql.avro.SchemaConverters import com.salesforce.op.OpParams import com.salesforce.op.utils.io.csv.{CSVInOut, CSVOptions, CSVToAvro} -import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecord import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.csv.CSVSchemaUtils @@ -74,10 +73,11 @@ class CSVAutoReader[T <: GenericRecord : ClassTag] val hdrsSet = hdrs.toSet val data = csvData.filter(_.exists(!hdrsSet.contains(_))) - val inferredSchema = CSVSchemaUtils.infer(data.map(_.toArray), hdrs, options) - val builder = SchemaBuilder.record(recordName).namespace(recordNamespace) - val schema = SchemaConverters.convertStructToAvro(inferredSchema, builder, recordNamespace) - + val columnPrunning = spark.sessionState.conf.csvColumnPruning + val inferredSchema = CSVSchemaUtils.infer(data.map(_.toArray), hdrs, options, columnPrunning) + val schema = SchemaConverters.toAvroType( + inferredSchema, nullable = false, recordName = recordName, nameSpace = recordNamespace + ) val avroData: RDD[T] = CSVToAvro.toAvroTyped[T](data, schema.toString, timeZone) maybeRepartition(avroData, params) } diff --git a/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala b/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala index 8a633b9739..6d8b4a9593 100644 --- a/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala +++ b/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala @@ -43,18 +43,22 @@ case object CSVSchemaUtils { * 2. Merge row types to find common type * 3. Replace any null types with string type * - * @param rdd data - * @param header CSV header - * @param options CSV options + * @param rdd data + * @param header CSV header + * @param options CSV options + * @param columnPruning If it is set to true, column names of the requested schema are passed to CSV parser. + * Other column values can be ignored during parsing even if they are malformed. * @return inferred schema */ def infer( rdd: RDD[Array[String]], header: Seq[String], - options: com.salesforce.op.utils.io.csv.CSVOptions + options: com.salesforce.op.utils.io.csv.CSVOptions, + columnPruning: Boolean = true ): StructType = { val opts = new org.apache.spark.sql.execution.datasources.csv.CSVOptions( parameters = options.copy(header = false).toSparkCSVOptionsMap + ("inferSchema" -> true.toString), + columnPruning = columnPruning, defaultTimeZoneId = "GMT" ) CSVInferSchema.infer(rdd, header.toArray, opts) diff --git a/templates/simple/build.gradle.template b/templates/simple/build.gradle.template index a9bb81153d..fd70005fdd 100644 --- a/templates/simple/build.gradle.template +++ b/templates/simple/build.gradle.template @@ -1,15 +1,16 @@ buildscript { repositories { mavenCentral() + jcenter() maven { url "https://plugins.gradle.org/m2/" } } - //dependencies { + dependencies { + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' // classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' - //} + } } plugins { - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'com.github.johnrengelman.shadow' version '5.0.0' } @@ -79,8 +80,7 @@ dependencies { testCompile("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } // Spark Avro - compile ("com.databricks:spark-avro_$scalaVersion:$sparkAvroVersion") { exclude group: "org.apache.avro", module: "avro" } - + compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" } configurations.all { diff --git a/utils/build.gradle b/utils/build.gradle index f18024ec72..3915d616b6 100644 --- a/utils/build.gradle +++ b/utils/build.gradle @@ -7,7 +7,7 @@ dependencies { testCompile("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } // Spark Avro - compile ("com.databricks:spark-avro_$scalaVersion:$sparkAvroVersion") { exclude group: "org.apache.avro", module: "avro" } + compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" // Jackson Yaml compile ("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion") { exclude group: "com.fasterxml.jackson.core" } diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala b/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala index e02aaf6a3b..49ac6a07c5 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala @@ -95,7 +95,7 @@ object CSVToAvro { val value = if (index < columns.size) columns(index) else try { - field.defaultValue().asText() + field.defaultVal().toString } catch { case e: Exception => throw new InvalidParameterException("Mismatch number of fields in csv record and avro schema") diff --git a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala index dcf04ee337..1dfcb0b898 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala @@ -31,19 +31,22 @@ package com.salesforce.op.utils.json import org.json4s.CustomSerializer -import org.json4s.JsonAST.JString +import org.json4s.JsonAST.{JDouble, JString, JDecimal} /** * Json4s serializer for marshalling special Double values: NaN, -Infinity and Infinity */ // scalastyle:off -class SpecialDoubleSerializer extends CustomSerializer[Double](_ => +class SpecialDoubleSerializer extends CustomSerializer[Double](ser => ({ case JString("NaN") => Double.NaN case JString("-Infinity") => Double.NegativeInfinity case JString("Infinity") => Double.PositiveInfinity + case JDouble(v) => v }, { case v: Double if v.isNaN => JString("NaN") case Double.NegativeInfinity => JString("-Infinity") case Double.PositiveInfinity => JString("Infinity") + case v: Double if ser.wantsBigDecimal => JDecimal(v) + case v: Double => JDouble(v) })) diff --git a/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala b/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala index 87ed5548ef..9138b04a12 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala @@ -32,7 +32,7 @@ package com.salesforce.op.utils.json import com.salesforce.op.test.TestCommon import org.json4s.jackson.JsonMethods._ -import org.json4s.{DefaultFormats, Extraction} +import org.json4s.{DefaultFormats, Extraction, Formats} import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @@ -41,8 +41,6 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class SpecialDoubleSerializerTest extends FlatSpec with TestCommon { - implicit val formats = DefaultFormats + new SpecialDoubleSerializer - val data = Map( "normal" -> Seq(-1.1, 0.0, 2.3), "infs" -> Seq(Double.NegativeInfinity, Double.PositiveInfinity), @@ -50,18 +48,28 @@ class SpecialDoubleSerializerTest extends FlatSpec with TestCommon { "nan" -> Seq(Double.NaN) ) - val dataJson = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E308,1.7976931348623157E308],"nan":["NaN"]}""" // scalastyle:off + Spec[SpecialDoubleSerializer] should behave like + readWriteDoubleValues(data)( + json = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E308,1.7976931348623157E308],"nan":["NaN"]}""" // scalastyle:off + )(DefaultFormats + new SpecialDoubleSerializer) - Spec[SpecialDoubleSerializer] should "write double entries" in { - compact(Extraction.decompose(data)) shouldBe dataJson - } - it should "read double entries" in { - val parsed = parse(dataJson).extract[Map[String, Seq[Double]]] - parsed.keys shouldBe data.keys + Spec[SpecialDoubleSerializer] + " (with big decimal)" should behave like + readWriteDoubleValues(data)( + json = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E+308,1.7976931348623157E+308],"nan":["NaN"]}""" // scalastyle:off + )(DefaultFormats.withBigDecimal + new SpecialDoubleSerializer) - parsed zip data foreach { - case (("nan", a), ("nan", b)) => a.foreach(_.isNaN shouldBe true) - case ((_, a), (_, b)) => a should contain theSameElementsAs b + + def readWriteDoubleValues(input: Map[String, Seq[Double]])(json: String)(implicit formats: Formats): Unit = { + it should "write double entries" in { + compact(Extraction.decompose(input)) shouldBe json + } + it should "read double entries" in { + val parsed = parse(json).extract[Map[String, Seq[Double]]] + parsed.keys shouldBe input.keys + parsed zip input foreach { + case (("nan", a), ("nan", b)) => a.foreach(_.isNaN shouldBe true) + case ((_, a), (_, b)) => a should contain theSameElementsAs b + } } } } From 485fcd54662e2b6f8087e0cd4546b383fb4a14e8 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 30 May 2019 13:48:57 -0700 Subject: [PATCH 02/45] Update to Spark 2.4.3 and XGBoost 0.90 --- build.gradle | 20 +++++++++---------- cli/build.gradle | 1 - .../com/salesforce/op/cli/SchemaSource.scala | 2 +- .../salesforce/op/cli/gen/AvroFieldTest.scala | 4 ++-- .../classification/OpXGBoostClassifier.scala | 4 ++-- .../xgboost4j/scala/spark/XGBoostParams.scala | 6 +++--- .../op/utils/spark/RichDataset.scala | 5 ++--- .../ml/SparkDefaultParamsReadWrite.scala | 16 +++++++++------ .../op/readers/CSVAutoReaders.scala | 12 +++++------ .../datasources/csv/CSVSchemaUtils.scala | 12 +++++++---- templates/simple/build.gradle.template | 11 +++++----- utils/build.gradle | 2 +- .../op/utils/io/csv/CSVToAvro.scala | 2 +- 13 files changed, 52 insertions(+), 45 deletions(-) diff --git a/build.gradle b/build.gradle index f1ec9672ae..23cc845637 100644 --- a/build.gradle +++ b/build.gradle @@ -1,15 +1,16 @@ buildscript { repositories { - maven { url "https://plugins.gradle.org/m2/" } mavenCentral() + jcenter() + maven { url "https://plugins.gradle.org/m2/" } } dependencies { classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' } } plugins { - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'org.scoverage' version '2.5.0' id 'net.minecrell.licenser' version '0.4.1' id 'com.github.jk1.dependency-license-report' version '0.5.0' @@ -57,14 +58,13 @@ configure(allProjs) { scalaVersionRevision = '12' scalaTestVersion = '3.0.5' scalaCheckVersion = '1.14.0' - junitVersion = '4.11' - avroVersion = '1.7.7' - sparkVersion = '2.3.2' - sparkAvroVersion = '4.0.0' + junitVersion = '4.12' + avroVersion = '1.8.2' + sparkVersion = '2.4.3' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' hadoopVersion = 'hadoop2' - json4sVersion = '3.2.11' // matches Spark dependency version + json4sVersion = '3.5.3' // matches Spark dependency version jodaTimeVersion = '2.9.4' jodaConvertVersion = '1.8.1' algebirdVersion = '0.13.4' @@ -75,18 +75,18 @@ configure(allProjs) { googleLibPhoneNumberVersion = '8.8.5' googleGeoCoderVersion = '2.82' googleCarrierVersion = '1.72' - chillVersion = '0.8.4' + chillVersion = '0.9.3' reflectionsVersion = '0.9.11' collectionsVersion = '3.2.2' optimaizeLangDetectorVersion = '0.0.1' tikaVersion = '1.22' - sparkTestingBaseVersion = '2.3.1_0.10.0' + sparkTestingBaseVersion = '2.4.0_0.11.0' sourceCodeVersion = '0.1.3' pegdownVersion = '1.4.2' commonsValidatorVersion = '1.6' commonsIOVersion = '2.6' scoveragePluginVersion = '1.3.1' - xgboostVersion = '0.81' + xgboostVersion = '0.90' akkaSlf4jVersion = '2.3.11' mleapVersion = '0.13.0' memoryFilesystemVersion = '2.1.0' diff --git a/cli/build.gradle b/cli/build.gradle index 3062917744..bbd939e39f 100644 --- a/cli/build.gradle +++ b/cli/build.gradle @@ -77,7 +77,6 @@ task copyTemplates(type: Copy) { junitVersion: junitVersion, sparkVersion: sparkVersion, avroVersion: avroVersion, - sparkAvroVersion: sparkAvroVersion, hadoopVersion: hadoopVersion, collectionsVersion: collectionsVersion, transmogrifaiVersion: version, diff --git a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala index ee863c8a81..5ad27f866b 100644 --- a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala +++ b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala @@ -138,7 +138,7 @@ case class AutomaticSchema(recordClassName: String)(dataFile: File) extends Sche case Some(actualType) => val newSchema = Schema.create(actualType) val schemaField = - new Schema.Field(field.name, newSchema, "auto-generated", orgSchemaField.defaultValue) + new Schema.Field(field.name, newSchema, "auto-generated", orgSchemaField.defaultVal()) AvroField.from(schemaField) } } else field diff --git a/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala b/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala index 80bce98cab..38a686a711 100644 --- a/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala +++ b/cli/src/test/scala/com/salesforce/op/cli/gen/AvroFieldTest.scala @@ -69,7 +69,7 @@ class AvroFieldTest extends FlatSpec with TestCommon with Assertions { val allSchemas = (enum::unions)++simpleSchemas // NULL does not work val fields = allSchemas.zipWithIndex map { - case (s, i) => new Schema.Field("x" + i, s, "Who", null) + case (s, i) => new Schema.Field("x" + i, s, "Who", null: Object) } val expected = List( @@ -86,7 +86,7 @@ class AvroFieldTest extends FlatSpec with TestCommon with Assertions { an[IllegalArgumentException] should be thrownBy { val nullSchema = Schema.create(Schema.Type.NULL) - val nullField = new Schema.Field("xxx", null, "Nobody", null) + val nullField = new Schema.Field("xxx", null, "Nobody", null: Object) AvroField from nullField } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala index 30f9801fb4..a114d2f62c 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala @@ -359,11 +359,11 @@ class OpXGBoostClassificationModel private lazy val model = getSparkMlStage().get private lazy val booster = model.nativeBooster - private lazy val treeLimit = model.getTreeLimit.toInt + private lazy val treeLimit = model.getTreeLimit private lazy val missing = model.getMissing override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { - val data = removeMissingValues(Iterator(features.value.asXGB), missing) + val data = processMissingValues(Iterator(features.value.asXGB), missing) val dm = new DMatrix(dataIter = data) val rawPred = booster.predict(dm, outPutMargin = true, treeLimit = treeLimit)(0).map(_.toDouble) val rawPrediction = if (model.numClasses == 2) Array(-rawPred(0), rawPred(0)) else rawPred diff --git a/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala index 88077fa63f..3c8959997d 100644 --- a/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala +++ b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala @@ -104,8 +104,8 @@ case object OpXGBoost { } /** - * Hack to access [[ml.dmlc.xgboost4j.scala.spark.XGBoost.removeMissingValues]] private method + * Hack to access [[ml.dmlc.xgboost4j.scala.spark.XGBoost.processMissingValues]] private method */ - def removeMissingValues(xgbLabelPoints: Iterator[LabeledPoint], missing: Float): Iterator[LabeledPoint] = - XGBoost.removeMissingValues(xgbLabelPoints, missing) + def processMissingValues(xgbLabelPoints: Iterator[LabeledPoint], missing: Float): Iterator[LabeledPoint] = + XGBoost.processMissingValues(xgbLabelPoints, missing) } diff --git a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala index 27ea60e30a..bc98909094 100644 --- a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala +++ b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala @@ -36,7 +36,6 @@ import com.salesforce.op.utils.text.TextUtils import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} -import com.databricks.spark.avro._ import org.apache.spark.ml.linalg.{Vector, Vectors} import scala.collection.mutable.{WrappedArray => MWrappedArray} @@ -72,7 +71,7 @@ object RichDataset { val schemaStr = spark.sparkContext.textFile(schemaPath(path)).collect().mkString val schema = DataType.fromJson(schemaStr).asInstanceOf[StructType] val origNames = schema.fields.map(_.metadata.getString(OriginalNameMetaKey)) - val data = spark.read.avro(dataPath(path)).toDF(origNames: _*) + val data = spark.read.format("avro").load(dataPath(path)).toDF(origNames: _*) val columns = for { (c, f) <- data.columns.zip(schema.fields) @@ -212,7 +211,7 @@ object RichDataset { val cleaned = ds.select(columns: _*) spark.sparkContext.parallelize(Seq(cleaned.schema.prettyJson), 1).saveAsTextFile(schemaPath(path)) - cleaned.write.mode(saveMode).options(options).avro(dataPath(path)) + cleaned.write.mode(saveMode).options(options).format("avro").save(dataPath(path)) } /** diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index 8179b2b403..3c4590758a 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -79,15 +79,19 @@ case object SparkDefaultParamsReadWrite { * @param expectedClassName If non empty, this is checked against the loaded metadata. * @throws IllegalArgumentException if expectedClassName is specified and does not match metadata */ - def parseMetadata(jsonStr: String): Metadata = - DefaultParamsReader.parseMetadata(jsonStr) + def parseMetadata(metadataStr: String, expectedClassName: String = ""): Metadata = + DefaultParamsReader.parseMetadata(metadataStr) /** * Extract Params from metadata, and set them in the instance. - * This works if all Params implement [[org.apache.spark.ml.param.Param.jsonDecode()]]. - * TODO: Move to [[Metadata]] method + * This works if all Params (except params included by `skipParams` list) implement + * [[org.apache.spark.ml.param.Param.jsonDecode()]]. + * + * @param skipParams The params included in `skipParams` won't be set. This is useful if some + * params don't implement [[org.apache.spark.ml.param.Param.jsonDecode()]] + * and need special handling. */ - def getAndSetParams(stage: OpPipelineStageBase, metadata: Metadata): Unit = - DefaultParamsReader.getAndSetParams(stage, metadata) + def getAndSetParams(stage: OpPipelineStageBase, metadata: Metadata, skipParams: Option[List[String]] = None): Unit = + metadata.getAndSetParams(stage, skipParams) } diff --git a/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala b/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala index 031703e900..ab56e16cfb 100644 --- a/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala +++ b/readers/src/main/scala/com/salesforce/op/readers/CSVAutoReaders.scala @@ -30,10 +30,9 @@ package com.salesforce.op.readers -import com.databricks.spark.avro.SchemaConverters +import org.apache.spark.sql.avro.SchemaConverters import com.salesforce.op.OpParams import com.salesforce.op.utils.io.csv.{CSVInOut, CSVOptions, CSVToAvro} -import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericRecord import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.csv.CSVSchemaUtils @@ -74,10 +73,11 @@ class CSVAutoReader[T <: GenericRecord : ClassTag] val hdrsSet = hdrs.toSet val data = csvData.filter(_.exists(!hdrsSet.contains(_))) - val inferredSchema = CSVSchemaUtils.infer(data.map(_.toArray), hdrs, options) - val builder = SchemaBuilder.record(recordName).namespace(recordNamespace) - val schema = SchemaConverters.convertStructToAvro(inferredSchema, builder, recordNamespace) - + val columnPrunning = spark.sessionState.conf.csvColumnPruning + val inferredSchema = CSVSchemaUtils.infer(data.map(_.toArray), hdrs, options, columnPrunning) + val schema = SchemaConverters.toAvroType( + inferredSchema, nullable = false, recordName = recordName, nameSpace = recordNamespace + ) val avroData: RDD[T] = CSVToAvro.toAvroTyped[T](data, schema.toString, timeZone) maybeRepartition(avroData, params) } diff --git a/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala b/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala index 8a633b9739..6d8b4a9593 100644 --- a/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala +++ b/readers/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVSchemaUtils.scala @@ -43,18 +43,22 @@ case object CSVSchemaUtils { * 2. Merge row types to find common type * 3. Replace any null types with string type * - * @param rdd data - * @param header CSV header - * @param options CSV options + * @param rdd data + * @param header CSV header + * @param options CSV options + * @param columnPruning If it is set to true, column names of the requested schema are passed to CSV parser. + * Other column values can be ignored during parsing even if they are malformed. * @return inferred schema */ def infer( rdd: RDD[Array[String]], header: Seq[String], - options: com.salesforce.op.utils.io.csv.CSVOptions + options: com.salesforce.op.utils.io.csv.CSVOptions, + columnPruning: Boolean = true ): StructType = { val opts = new org.apache.spark.sql.execution.datasources.csv.CSVOptions( parameters = options.copy(header = false).toSparkCSVOptionsMap + ("inferSchema" -> true.toString), + columnPruning = columnPruning, defaultTimeZoneId = "GMT" ) CSVInferSchema.infer(rdd, header.toArray, opts) diff --git a/templates/simple/build.gradle.template b/templates/simple/build.gradle.template index a9bb81153d..5e0aaa3b65 100644 --- a/templates/simple/build.gradle.template +++ b/templates/simple/build.gradle.template @@ -1,15 +1,16 @@ buildscript { repositories { mavenCentral() + jcenter() maven { url "https://plugins.gradle.org/m2/" } } - //dependencies { + dependencies { + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' // classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' - //} + } } plugins { - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'com.github.johnrengelman.shadow' version '5.0.0' } @@ -78,8 +79,8 @@ dependencies { compileOnly("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } testCompile("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } - // Spark Avro - compile ("com.databricks:spark-avro_$scalaVersion:$sparkAvroVersion") { exclude group: "org.apache.avro", module: "avro" } + // Spark Avro + compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" } diff --git a/utils/build.gradle b/utils/build.gradle index f18024ec72..3915d616b6 100644 --- a/utils/build.gradle +++ b/utils/build.gradle @@ -7,7 +7,7 @@ dependencies { testCompile("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } // Spark Avro - compile ("com.databricks:spark-avro_$scalaVersion:$sparkAvroVersion") { exclude group: "org.apache.avro", module: "avro" } + compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" // Jackson Yaml compile ("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion") { exclude group: "com.fasterxml.jackson.core" } diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala b/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala index e02aaf6a3b..49ac6a07c5 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/io/csv/CSVToAvro.scala @@ -95,7 +95,7 @@ object CSVToAvro { val value = if (index < columns.size) columns(index) else try { - field.defaultValue().asText() + field.defaultVal().toString } catch { case e: Exception => throw new InvalidParameterException("Mismatch number of fields in csv record and avro schema") From 12f533395385a6aaab37344a2a2e3e6168281415 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 30 May 2019 14:09:07 -0700 Subject: [PATCH 03/45] special double serializer fix --- .../com/salesforce/op/utils/json/SpecialDoubleSerializer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala index dcf04ee337..787631a70c 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala @@ -31,7 +31,7 @@ package com.salesforce.op.utils.json import org.json4s.CustomSerializer -import org.json4s.JsonAST.JString +import org.json4s.JsonAST.{JDouble, JString} /** * Json4s serializer for marshalling special Double values: NaN, -Infinity and Infinity @@ -42,6 +42,7 @@ class SpecialDoubleSerializer extends CustomSerializer[Double](_ => case JString("NaN") => Double.NaN case JString("-Infinity") => Double.NegativeInfinity case JString("Infinity") => Double.PositiveInfinity + case JDouble(v) => v }, { case v: Double if v.isNaN => JString("NaN") case Double.NegativeInfinity => JString("-Infinity") From a4ee98653c535199a84f1c0bc9928cc7f7dc2ba7 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 30 May 2019 14:49:19 -0700 Subject: [PATCH 04/45] fix serialization --- .../ml/SparkDefaultParamsReadWrite.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index 3c4590758a..32572b844a 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -47,28 +47,34 @@ case object SparkDefaultParamsReadWrite { * @see [[OpPipelineStageWriter]] for details on what this includes. */ def getMetadataToSave( - stage: OpPipelineStageBase, + instance: OpPipelineStageBase, extraMetadata: Option[JObject] = None, paramMap: Option[JValue] = None - ): JObject = { - val uid = stage.uid - val cls = stage.getClass.getName - val params = stage.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] + ): String = { + val uid = instance.uid + val cls = instance.getClass.getName + val params = instance.paramMap.toSeq + val defaultParams = instance.defaultParamMap.toSeq val jsonParams = paramMap.getOrElse(render(params.map { case ParamPair(p, v) => p.name -> parse(p.jsonEncode(v)) }.toList)) + val jsonDefaultParams = render(defaultParams.map { case ParamPair(p, v) => + p.name -> parse(p.jsonEncode(v)) + }.toList) val basicMetadata = ("class" -> cls) ~ ("timestamp" -> System.currentTimeMillis()) ~ ("sparkVersion" -> org.apache.spark.SPARK_VERSION) ~ ("uid" -> uid) ~ - ("paramMap" -> jsonParams) + ("paramMap" -> jsonParams) ~ + ("defaultParamMap" -> jsonDefaultParams) val metadata = extraMetadata match { case Some(jObject) => basicMetadata ~ jObject case None => basicMetadata } - metadata + val metadataJson: String = compact(render(metadata)) + metadataJson } /** From 47703fa6f9e76270aed447540d6d3568330f69e6 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 30 May 2019 15:24:51 -0700 Subject: [PATCH 05/45] fix serialization --- .../ml/SparkDefaultParamsReadWrite.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index 32572b844a..b1039a2fa1 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -85,8 +85,31 @@ case object SparkDefaultParamsReadWrite { * @param expectedClassName If non empty, this is checked against the loaded metadata. * @throws IllegalArgumentException if expectedClassName is specified and does not match metadata */ - def parseMetadata(metadataStr: String, expectedClassName: String = ""): Metadata = - DefaultParamsReader.parseMetadata(metadataStr) + def parseMetadata(metadataStr: String, expectedClassName: String = ""): Metadata = { + val metadata = parse(metadataStr) + + implicit val format = DefaultFormats + val className = (metadata \ "class").extract[String] + val uid = (metadata \ "uid").extract[String] + val timestamp = (metadata \ "timestamp").extract[Long] + val sparkVersion = (metadata \ "sparkVersion").extract[String] + val params = metadata \ "paramMap" + val defaultParams = metadata \ "defaultParamMap" + if (expectedClassName.nonEmpty) { + require(className == expectedClassName, s"Error loading metadata: Expected class name" + + s" $expectedClassName but found class name $className") + } + // ****************************************************************************************** + /** + * Backward compatible fix for models trained with older versions of Spark (prior to 2.4.x). + * The change introduced in https://github.com/apache/spark/pull/20633 added serialization of + * default params, older models won't have them and fail to load. + */ + val defaultParamsFix = if (defaultParams == JNothing) JObject() else defaultParams + // ****************************************************************************************** + + new Metadata(className, uid, timestamp, sparkVersion, params, defaultParamsFix, metadata, metadataStr) + } /** * Extract Params from metadata, and set them in the instance. From ea4b11f6facfb3d90017746fd44866cfcf799757 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 30 May 2019 15:26:43 -0700 Subject: [PATCH 06/45] docs --- .../org/apache/spark/ml/SparkDefaultParamsReadWrite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index b1039a2fa1..bfd932298f 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -81,6 +81,9 @@ case object SparkDefaultParamsReadWrite { * Parse metadata JSON string produced by [[DefaultParamsWriter.getMetadataToSave()]]. * This is a helper function for [[loadMetadata()]]. * + * Note: this method was taken from DefaultParamsWriter.parseMetadata, + * but modified to avoid failing on loading of Spark models prior to 2.4.x + * * @param metadataStr JSON string of metadata * @param expectedClassName If non empty, this is checked against the loaded metadata. * @throws IllegalArgumentException if expectedClassName is specified and does not match metadata From ab81e318cacb877ed83e9752726b0d54b503f129 Mon Sep 17 00:00:00 2001 From: Christopher Suchanek Date: Thu, 30 May 2019 16:08:39 -0700 Subject: [PATCH 07/45] fixed missng value for test --- core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala index 7331b91dd5..0cd14dd341 100644 --- a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala +++ b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala @@ -76,8 +76,8 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou val lrParams = new ParamGridBuilder().addGrid(lr.regParam, Array(0.01, 0.1)).build() val models = Seq(lr -> lrParams).asInstanceOf[Seq[(EstimatorType, Array[ParamMap])]] - val xgbClassifier = new OpXGBoostClassifier().setSilent(1).setSeed(42L) - val xgbRegressor = new OpXGBoostRegressor().setSilent(1).setSeed(42L) + val xgbClassifier = new OpXGBoostClassifier().setMissing(0.0f).setSilent(1).setSeed(42L) + val xgbRegressor = new OpXGBoostRegressor().setMissing(0.0f).setSilent(1).setSeed(42L) val xgbClassifierPred = xgbClassifier.setInput(label, features).getOutput() val xgbRegressorPred = xgbRegressor.setInput(label, features).getOutput() lazy val xgbWorkflow = From c017869289cca66f7eaf314d7f31685d67141d02 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 30 May 2019 16:20:49 -0700 Subject: [PATCH 08/45] meta fix --- .../com/salesforce/op/stages/OpPipelineStageReader.scala | 7 ++++--- .../org/apache/spark/ml/SparkDefaultParamsReadWrite.scala | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala index 8e0eea4087..4b1a7d1e87 100644 --- a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala +++ b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReader.scala @@ -122,9 +122,10 @@ final class OpPipelineStageReader private // Update [[SparkWrapperParams]] with path so we can load the [[SparkStageParam]] instance val updatedMetadata = stage match { - case _: SparkWrapperParams[_] => - val updatedParams = SparkStageParam.updateParamsMetadataWithPath(metadata.params, path) - metadata.copy(params = updatedParams) + case _: SparkWrapperParams[_] => metadata.copy( + params = SparkStageParam.updateParamsMetadataWithPath(metadata.params, path), + defaultParams = SparkStageParam.updateParamsMetadataWithPath(metadata.defaultParams, path) + ) case _ => metadata } diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index bfd932298f..d86c199d84 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -81,7 +81,7 @@ case object SparkDefaultParamsReadWrite { * Parse metadata JSON string produced by [[DefaultParamsWriter.getMetadataToSave()]]. * This is a helper function for [[loadMetadata()]]. * - * Note: this method was taken from DefaultParamsWriter.parseMetadata, + * Note: this method was taken from [[DefaultParamsWriter.parseMetadata]], * but modified to avoid failing on loading of Spark models prior to 2.4.x * * @param metadataStr JSON string of metadata From 99ea7e1dc8b3352350d7805c74e42f9e5edf7062 Mon Sep 17 00:00:00 2001 From: Kevin Moore Date: Thu, 30 May 2019 23:23:52 -0700 Subject: [PATCH 09/45] Updated DecisionTreeNumericMapBucketizer test to deal with the change made to decision tree pruning in Spark 2.4. If nodes are split, but both child nodes lead to the same prediction then the split is pruned away. This updates the test so this doesn't happen for feature 'b' --- .../impl/feature/DecisionTreeNumericMapBucketizerTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala index 6b4fc3105b..10e368c7c9 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericMapBucketizerTest.scala @@ -53,7 +53,7 @@ class DecisionTreeNumericMapBucketizerTest extends OpEstimatorSpec[OPVector, val (inputData, estimator) = { val numericData = Seq( - Map("a" -> 1.0), + Map("a" -> 1.0, "b" -> 1.0), Map("a" -> 18.0), Map("b" -> 0.0), Map("a" -> -1.23, "b" -> 1.0), @@ -66,7 +66,7 @@ class DecisionTreeNumericMapBucketizerTest extends OpEstimatorSpec[OPVector, } val expectedResult = Seq( - Vectors.sparse(7, Array(1, 5, 6), Array(1.0, 1.0, 1.0)), + Vectors.sparse(7, Array(1, 4, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(1, 5, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(2, 3, 6), Array(1.0, 1.0, 1.0)), Vectors.sparse(7, Array(0, 4, 6), Array(1.0, 1.0, 1.0)), From daa2672cbc820311fef8f904b110104939044a99 Mon Sep 17 00:00:00 2001 From: Matthew Date: Fri, 31 May 2019 08:53:09 -0700 Subject: [PATCH 10/45] fix params meta test --- .../OpPipelineStageReaderWriterTest.scala | 17 +++++++++---- .../ml/SparkDefaultParamsReadWrite.scala | 25 ++++++++++--------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala b/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala index 8791277d9e..50019561a2 100644 --- a/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/OpPipelineStageReaderWriterTest.scala @@ -70,8 +70,8 @@ private[stages] abstract class OpPipelineStageReaderWriterTest it should "write class name" in { (stageJson \ FN.Class.entryName).extract[String] shouldBe stage.getClass.getName } - it should "write paramMap" in { - val params = (stageJson \ FN.ParamMap.entryName).extract[Map[String, Any]] + it should "write params map" in { + val params = extractParams(stageJson).extract[Map[String, Any]] if (hasOutputName) { params should have size 4 params.keys shouldBe Set("inputFeatures", "outputMetadata", "inputSchema", "outputFeatureName") @@ -81,17 +81,18 @@ private[stages] abstract class OpPipelineStageReaderWriterTest } } it should "write outputMetadata" in { - val metadataStr = compact(render((stageJson \ FN.ParamMap.entryName) \ "outputMetadata")) + val params = extractParams(stageJson) + val metadataStr = compact(render(extractParams(stageJson) \ "outputMetadata")) val metadata = Metadata.fromJson(metadataStr) metadata shouldBe stage.getMetadata() } it should "write inputSchema" in { - val schemaStr = compact(render((stageJson \ FN.ParamMap.entryName) \ "inputSchema")) + val schemaStr = compact(render(extractParams(stageJson) \ "inputSchema")) val schema = DataType.fromJson(schemaStr) schema shouldBe stage.getInputSchema() } it should "write input features" in { - val jArray = ((stageJson \ FN.ParamMap.entryName) \ "inputFeatures").extract[JArray] + val jArray = (extractParams(stageJson) \ "inputFeatures").extract[JArray] jArray.values should have length expectedFeaturesLength val obj = jArray(0).extract[JObject] obj.values.keys shouldBe Set("name", "isResponse", "isRaw", "uid", "typeName", "stages", "originFeatures") @@ -118,4 +119,10 @@ private[stages] abstract class OpPipelineStageReaderWriterTest stageLoaded.getInputSchema() shouldBe stage.getInputSchema() } + private def extractParams(stageJson: JValue): JValue = { + val defaultParamsMap = stageJson \ FN.DefaultParamMap.entryName + val paramsMap = stageJson \ FN.ParamMap.entryName + defaultParamsMap.merge(paramsMap) + } + } diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index d86c199d84..7dac65040f 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -21,6 +21,7 @@ package org.apache.spark.ml import com.salesforce.op.stages.OpPipelineStageBase +import com.salesforce.op.stages.OpPipelineStageReadWriteShared.FieldNames._ import org.apache.spark.ml.param.ParamPair import org.apache.spark.ml.util.{DefaultParamsReader, DefaultParamsWriter} import org.json4s.JsonDSL._ @@ -61,12 +62,12 @@ case object SparkDefaultParamsReadWrite { val jsonDefaultParams = render(defaultParams.map { case ParamPair(p, v) => p.name -> parse(p.jsonEncode(v)) }.toList) - val basicMetadata = ("class" -> cls) ~ - ("timestamp" -> System.currentTimeMillis()) ~ - ("sparkVersion" -> org.apache.spark.SPARK_VERSION) ~ - ("uid" -> uid) ~ - ("paramMap" -> jsonParams) ~ - ("defaultParamMap" -> jsonDefaultParams) + val basicMetadata = (Class.entryName -> cls) ~ + (Timestamp.entryName -> System.currentTimeMillis()) ~ + (SparkVersion.entryName -> org.apache.spark.SPARK_VERSION) ~ + (Uid.entryName -> uid) ~ + (ParamMap.entryName -> jsonParams) ~ + (DefaultParamMap.entryName -> jsonDefaultParams) val metadata = extraMetadata match { case Some(jObject) => basicMetadata ~ jObject @@ -92,12 +93,12 @@ case object SparkDefaultParamsReadWrite { val metadata = parse(metadataStr) implicit val format = DefaultFormats - val className = (metadata \ "class").extract[String] - val uid = (metadata \ "uid").extract[String] - val timestamp = (metadata \ "timestamp").extract[Long] - val sparkVersion = (metadata \ "sparkVersion").extract[String] - val params = metadata \ "paramMap" - val defaultParams = metadata \ "defaultParamMap" + val className = (metadata \ Class.entryName).extract[String] + val uid = (metadata \ Uid.entryName).extract[String] + val timestamp = (metadata \ Timestamp.entryName).extract[Long] + val sparkVersion = (metadata \ SparkVersion.entryName).extract[String] + val params = metadata \ ParamMap.entryName + val defaultParams = metadata \ DefaultParamMap.entryName if (expectedClassName.nonEmpty) { require(className == expectedClassName, s"Error loading metadata: Expected class name" + s" $expectedClassName but found class name $className") From 7d3ebb7344e5c5dc791770eecc6bed89b095a23a Mon Sep 17 00:00:00 2001 From: Christopher Suchanek Date: Fri, 31 May 2019 09:09:56 -0700 Subject: [PATCH 11/45] FIxed failing xgboost test --- .../classification/OpClassifierModelTest.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala index ab8168cd6c..759d542b18 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala @@ -134,27 +134,22 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext with OpXGBoos .setLabelCol(labelF.name) val spk = cl.fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF), false) + compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } - def compareOutputs - ( - df1: DataFrame, - df2: DataFrame, - fullRawPred: Boolean = true - )(implicit arrayEquality: Equality[Array[Double]]): Unit = { + def compareOutputs(df1: DataFrame, df2: DataFrame)(implicit arrayEquality: Equality[Array[Double]]): Unit = { def keysStartsWith(name: String, value: Map[String, Double]): Array[Double] = { val names = value.keys.filter(_.startsWith(name)).toArray.sorted names.map(value) } + val sorted1 = df1.collect().sortBy(_.getAs[Double](4)) val sorted2 = df2.collect().sortBy(_.getAs[Map[String, Double]](2)(Prediction.Keys.PredictionName)) - sorted1.zip(sorted2).foreach{ case (r1, r2) => + sorted1.zip(sorted2).foreach { case (r1, r2) => val map = r2.getAs[Map[String, Double]](2) r1.getAs[Double](4) shouldEqual map(Prediction.Keys.PredictionName) r1.getAs[Vector](3).toArray shouldEqual keysStartsWith(Prediction.Keys.ProbabilityName, map) - if (fullRawPred) r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map) - else r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map).tail + r1.getAs[Vector](2).toArray shouldEqual keysStartsWith(Prediction.Keys.RawPredictionName, map) } } From 56616407b03aa62de426d226b5a9213b24f41f4c Mon Sep 17 00:00:00 2001 From: Matthew Date: Fri, 31 May 2019 10:10:00 -0700 Subject: [PATCH 12/45] ident --- templates/simple/build.gradle.template | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/templates/simple/build.gradle.template b/templates/simple/build.gradle.template index 5e0aaa3b65..fd70005fdd 100644 --- a/templates/simple/build.gradle.template +++ b/templates/simple/build.gradle.template @@ -79,9 +79,8 @@ dependencies { compileOnly("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } testCompile("org.apache.avro:avro-mapred:$avroVersion:$hadoopVersion") { exclude group: 'org.mortbay.jetty', module: 'servlet-api' } - // Spark Avro - compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" - + // Spark Avro + compile "org.apache.spark:spark-avro_$scalaVersion:$sparkVersion" } configurations.all { From 8852d69056ce3be676f3c1686109beb7d340fa65 Mon Sep 17 00:00:00 2001 From: Matthew Date: Fri, 31 May 2019 13:03:16 -0700 Subject: [PATCH 13/45] cleanup --- cli/build.gradle | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cli/build.gradle b/cli/build.gradle index bbd939e39f..3d6e9ffd9c 100644 --- a/cli/build.gradle +++ b/cli/build.gradle @@ -69,7 +69,6 @@ task copyTemplates(type: Copy) { fileName.replace(".gradle.template", ".gradle") } expand([ - databaseHostname: 'db.company.com', version: scalaVersion, scalaVersion: scalaVersion, scalaVersionRevision: scalaVersionRevision, @@ -79,9 +78,7 @@ task copyTemplates(type: Copy) { avroVersion: avroVersion, hadoopVersion: hadoopVersion, collectionsVersion: collectionsVersion, - transmogrifaiVersion: version, - buildNumber: (int)(Math.random() * 1000), - date: new Date() + transmogrifaiVersion: version ]) } From 2ab8924654da4b393d56718ba75c7e7f225c268b Mon Sep 17 00:00:00 2001 From: Matthew Date: Mon, 3 Jun 2019 13:49:37 -0700 Subject: [PATCH 14/45] added dataframe reader and writer extensions --- .../op/utils/spark/RichDataset.scala | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala index bc98909094..4eacda8ce0 100644 --- a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala +++ b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala @@ -59,6 +59,32 @@ object RichDataset { private[op] def schemaPath(path: String): String = s"${path.stripSuffix("/")}/schema" private[op] def dataPath(path: String): String = s"${path.stripSuffix("/")}/data" + implicit class RichDataFrameWriter[T](w: DataFrameWriter[T]) { + + /** + * Saves the content of the `DataFrame` in Avro format at the specified path. + * This is equivalent to: + * {{{ + * format("avro").save(path) + * }}} + */ + def avro(path: String): Unit = w.format("avro").save(path) + + } + + implicit class RichDataFrameReader(r: DataFrameReader) { + + /** + * Loads Avro files and returns the result as a `DataFrame`. + * This is equivalent to: + * {{{ + * format("avro").load(path) + * }}} + */ + def avro(path: String): DataFrame = r.format("avro").load(path) + + } + /** * Loads a dataframe from a saved Avro file and dataframe schema file generated by RichDataFrame.saveAvro. * Relies on spark-avro package for Avro file generation, which seems to have a bug/feature that makes all fields @@ -71,7 +97,7 @@ object RichDataset { val schemaStr = spark.sparkContext.textFile(schemaPath(path)).collect().mkString val schema = DataType.fromJson(schemaStr).asInstanceOf[StructType] val origNames = schema.fields.map(_.metadata.getString(OriginalNameMetaKey)) - val data = spark.read.format("avro").load(dataPath(path)).toDF(origNames: _*) + val data = spark.read.avro(dataPath(path)).toDF(origNames: _*) val columns = for { (c, f) <- data.columns.zip(schema.fields) @@ -83,7 +109,6 @@ object RichDataset { data.select(columns: _*) } - /** * A dataframe with three quantifiers: forall, exists, and forNone (see below) * the rest of extended functionality comes from RichDataset @@ -211,7 +236,7 @@ object RichDataset { val cleaned = ds.select(columns: _*) spark.sparkContext.parallelize(Seq(cleaned.schema.prettyJson), 1).saveAsTextFile(schemaPath(path)) - cleaned.write.mode(saveMode).options(options).format("avro").save(dataPath(path)) + cleaned.write.mode(saveMode).options(options).avro(dataPath(path)) } /** From 7c8b988741b478ad027697949cb0112155727fa5 Mon Sep 17 00:00:00 2001 From: Matthew Date: Mon, 3 Jun 2019 13:50:42 -0700 Subject: [PATCH 15/45] added const --- .../com/salesforce/op/utils/spark/RichDataset.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala index 4eacda8ce0..703b61414f 100644 --- a/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala +++ b/features/src/main/scala/com/salesforce/op/utils/spark/RichDataset.scala @@ -33,12 +33,13 @@ package com.salesforce.op.utils.spark import com.salesforce.op.features.types._ import com.salesforce.op.features.{FeatureLike, FeatureSparkTypes, OPFeature} import com.salesforce.op.utils.text.TextUtils +import org.apache.avro.mapred.AvroInputFormat import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.ml.linalg.{Vector, Vectors} -import scala.collection.mutable.{WrappedArray => MWrappedArray} +import scala.collection.mutable.{WrappedArray => MWrappedArray} import scala.reflect.ClassTag @@ -59,6 +60,8 @@ object RichDataset { private[op] def schemaPath(path: String): String = s"${path.stripSuffix("/")}/schema" private[op] def dataPath(path: String): String = s"${path.stripSuffix("/")}/data" + private val AvroFormat = "avro" + implicit class RichDataFrameWriter[T](w: DataFrameWriter[T]) { /** @@ -68,7 +71,7 @@ object RichDataset { * format("avro").save(path) * }}} */ - def avro(path: String): Unit = w.format("avro").save(path) + def avro(path: String): Unit = w.format(AvroFormat).save(path) } @@ -81,7 +84,7 @@ object RichDataset { * format("avro").load(path) * }}} */ - def avro(path: String): DataFrame = r.format("avro").load(path) + def avro(path: String): DataFrame = r.format(AvroFormat).load(path) } From d98c8a9328351854fb153dc3fe3b9adab8277de5 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Mon, 14 Oct 2019 13:54:21 -0700 Subject: [PATCH 16/45] cherrypick fixes --- .../com/salesforce/op/stages/OpPipelineStageReaderWriter.scala | 3 +++ .../org/apache/spark/ml/SparkDefaultParamsReadWrite.scala | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala index ef63f5d50c..aec70ef9df 100644 --- a/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala +++ b/features/src/main/scala/com/salesforce/op/stages/OpPipelineStageReaderWriter.scala @@ -171,6 +171,9 @@ object OpPipelineStageReaderWriter extends OpPipelineStageReadWriteFormats { case object Uid extends FieldNames("uid") case object Class extends FieldNames("class") case object ParamMap extends FieldNames("paramMap") + case object DefaultParamMap extends FieldNames("defaultParamMap") + case object Timestamp extends FieldNames("timestamp") + case object SparkVersion extends FieldNames("sparkVersion") } /** diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index 7dac65040f..d1d00198ff 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -21,7 +21,7 @@ package org.apache.spark.ml import com.salesforce.op.stages.OpPipelineStageBase -import com.salesforce.op.stages.OpPipelineStageReadWriteShared.FieldNames._ +import com.salesforce.op.stages.OpPipelineStageReaderWriter.FieldNames._ import org.apache.spark.ml.param.ParamPair import org.apache.spark.ml.util.{DefaultParamsReader, DefaultParamsWriter} import org.json4s.JsonDSL._ From cce0d8ff7a071777514c564c4444a7ba9a0d5a36 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 20 Jun 2019 21:49:57 -0700 Subject: [PATCH 17/45] added xgboost params + update models to use public predict method --- .../impl/classification/OpLinearSVC.scala | 4 +-- .../classification/OpXGBoostClassifier.scala | 22 ++++++++++++++++ .../regression/OpDecisionTreeRegressor.scala | 6 +---- .../impl/regression/OpGBTRegressor.scala | 6 +---- .../impl/regression/OpLinearRegression.scala | 6 +---- .../regression/OpRandomForestRegressor.scala | 7 +---- .../impl/regression/OpXGBoostRegressor.scala | 26 ++++++++++++++++--- .../specific/OpPredictionModel.scala | 7 ++--- .../ml/SparkDefaultParamsReadWrite.scala | 3 +-- 9 files changed, 55 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala index 425d43a866..1275e3d163 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala @@ -152,14 +152,14 @@ class OpLinearSVCModel ) extends OpPredictorWrapperModel[LinearSVCModel](uid = uid, operationName = operationName, sparkModel = sparkModel) { @transient lazy private val predictRaw = reflectMethod(getSparkMlStage().get, "predictRaw") - @transient lazy private val predict = reflectMethod(getSparkMlStage().get, "predict") + @transient lazy private val predict: Vector => Double = getSparkMlStage().get.predict(_) /** * Function used to convert input to output */ override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { val raw = predictRaw(features.value).asInstanceOf[Vector] - val pred = predict(features.value).asInstanceOf[Double] + val pred = predict(features.value) Prediction(rawPrediction = raw, prediction = pred) } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala index a114d2f62c..3c07a58787 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala @@ -235,6 +235,11 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) */ def setMaxBins(value: Int): this.type = set(maxBins, value) + /** + * Maximum number of nodes to be added. Only relevant when grow_policy=lossguide is set. + */ + def setMaxLeaves(value: Int): this.type = set(maxLeaves, value) + /** * This is only used for approximate greedy algorithm. * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select @@ -282,8 +287,19 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) def setLambdaBias(value: Double): this.type = set(lambdaBias, value) // setters for learning params + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:squarederror, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:squarederror + */ def setObjective(value: String): this.type = set(objective, value) + /** + * Objective type used for training. For options see [[ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams]] + */ + def setObjectiveType(value: String): this.type = set(objectiveType, value) + /** * Specify the learning task and the corresponding learning objective. * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, @@ -310,6 +326,11 @@ class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) */ def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + /** + * Define the expected optimization to the evaluation metrics, true to maximize otherwise minimize it + */ + def setMaximizeEvaluationMetrics(value: Boolean): this.type = set(maximizeEvaluationMetrics, value) + /** * Customized objective function provided by user. default: null */ @@ -370,6 +391,7 @@ class OpXGBoostClassificationModel val prob = booster.predict(dm, outPutMargin = false, treeLimit = treeLimit)(0).map(_.toDouble) val probability = if (model.numClasses == 2) Array(1.0 - prob(0), prob(0)) else prob val prediction = probability2predictionMirror(Vectors.dense(probability)).asInstanceOf[Double] + Prediction(prediction = prediction, rawPrediction = rawPrediction, probability = probability) } } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala index f42e1e50ed..39a5735949 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor, OpDecisionTreeRegressorParams} import scala.reflect.runtime.universe.TypeTag @@ -113,7 +112,4 @@ class OpDecisionTreeRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[DecisionTreeRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala index a8d69c9f14..b5717b49a4 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor, OpGBTRegressorParams} import scala.reflect.runtime.universe.TypeTag @@ -139,7 +138,4 @@ class OpGBTRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[GBTRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala index 780a496b60..e0da705c9d 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpLinearRegression.scala @@ -34,7 +34,6 @@ import com.salesforce.op._ import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel, OpLinearRegressionParams} import scala.reflect.runtime.universe.TypeTag @@ -180,7 +179,4 @@ class OpLinearRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[LinearRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala index 4b1aca8265..4b0fdbd1d5 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.regression.{OpRandomForestRegressorParams, RandomForestRegressionModel, RandomForestRegressor} import scala.reflect.runtime.universe.TypeTag @@ -126,8 +125,4 @@ class OpRandomForestRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[RandomForestRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} - - +) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala index 688f34f812..8e2eaaf49d 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala @@ -34,7 +34,6 @@ import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait} import ml.dmlc.xgboost4j.scala.spark.{OpXGBoostRegressorParams, TrackerConf, XGBoostRegressionModel, XGBoostRegressor} @@ -234,6 +233,11 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) */ def setMaxBins(value: Int): this.type = set(maxBins, value) + /** + * Maximum number of nodes to be added. Only relevant when grow_policy=lossguide is set. + */ + def setMaxLeaves(value: Int): this.type = set(maxLeaves, value) + /** * This is only used for approximate greedy algorithm. * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select @@ -281,8 +285,19 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) def setLambdaBias(value: Double): this.type = set(lambdaBias, value) // setters for learning params + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:squarederror, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:squarederror + */ def setObjective(value: String): this.type = set(objective, value) + /** + * Objective type used for training. For options see [[ml.dmlc.xgboost4j.scala.spark.params.LearningTaskParams]] + */ + def setObjectiveType(value: String): this.type = set(objectiveType, value) + /** * Specify the learning task and the corresponding learning objective. * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, @@ -309,6 +324,11 @@ class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) */ def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + /** + * Define the expected optimization to the evaluation metrics, true to maximize otherwise minimize it + */ + def setMaximizeEvaluationMetrics(value: Boolean): this.type = set(maximizeEvaluationMetrics, value) + /** * Customized objective function provided by user. default: null */ @@ -341,6 +361,4 @@ class OpXGBoostRegressionModel ttov: TypeTag[Prediction#Value] ) extends OpPredictionModel[XGBoostRegressionModel]( sparkModel = sparkModel, uid = uid, operationName = operationName -) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") -} +) diff --git a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala index cfcaae7278..bc59b13ba8 100644 --- a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala +++ b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala @@ -52,9 +52,10 @@ abstract class OpPredictionModel[T <: PredictionModel[Vector, T]] operationName: String ) extends OpPredictorWrapperModel[T](uid = uid, operationName = operationName, sparkModel = sparkModel) { - protected def predictMirror: MethodMirror - - protected def predict(features: Vector): Double = predictMirror.apply(features).asInstanceOf[Double] + /** + * Predict label for the given features + */ + @transient protected lazy val predict: Vector => Double = getSparkMlStage().get.predict(_) /** * Function used to convert input to output diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index d1d00198ff..1b6d9d674b 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -92,7 +92,6 @@ case object SparkDefaultParamsReadWrite { def parseMetadata(metadataStr: String, expectedClassName: String = ""): Metadata = { val metadata = parse(metadataStr) - implicit val format = DefaultFormats val className = (metadata \ Class.entryName).extract[String] val uid = (metadata \ Uid.entryName).extract[String] val timestamp = (metadata \ Timestamp.entryName).extract[Long] @@ -104,7 +103,7 @@ case object SparkDefaultParamsReadWrite { s" $expectedClassName but found class name $className") } // ****************************************************************************************** - /** + /* * Backward compatible fix for models trained with older versions of Spark (prior to 2.4.x). * The change introduced in https://github.com/apache/spark/pull/20633 added serialization of * default params, older models won't have them and fail to load. From 8afbae7917efd612dc0083e41f6a6d0835e13ec8 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 20 Jun 2019 22:40:41 -0700 Subject: [PATCH 18/45] blarg --- .../test/scala/com/salesforce/op/ModelInsightsTest.scala | 4 ++-- .../salesforce/op/utils/json/SpecialDoubleSerializer.scala | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala index 0cd14dd341..1e49ee7e11 100644 --- a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala +++ b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala @@ -44,6 +44,7 @@ import com.salesforce.op.stages.impl.tuning.{DataCutter, DataSplitter} import com.salesforce.op.test.{PassengerSparkFixtureTest, TestFeatureBuilder} import com.salesforce.op.testkit.RandomReal import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} +import ml.dmlc.xgboost4j.scala.spark.OpXGBoostQuietLogging import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.ParamGridBuilder import org.junit.runner.RunWith @@ -59,7 +60,7 @@ import org.apache.spark.sql.functions._ import scala.util.{Failure, Success} @RunWith(classOf[JUnitRunner]) -class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with DoubleEquality { +class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with DoubleEquality with OpXGBoostQuietLogging { private val density = weight / height private val generVec = genderPL.vectorize(topK = 10, minSupport = 1, cleanText = true) @@ -447,7 +448,6 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou } it should "correctly serialize and deserialize from json when raw feature filter is used" in { - val insights = modelWithRFF.modelInsights(predWithMaps) ModelInsights.fromJson(insights.toJson()) match { case Failure(e) => fail(e) diff --git a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala index 787631a70c..1dfcb0b898 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/json/SpecialDoubleSerializer.scala @@ -31,13 +31,13 @@ package com.salesforce.op.utils.json import org.json4s.CustomSerializer -import org.json4s.JsonAST.{JDouble, JString} +import org.json4s.JsonAST.{JDouble, JString, JDecimal} /** * Json4s serializer for marshalling special Double values: NaN, -Infinity and Infinity */ // scalastyle:off -class SpecialDoubleSerializer extends CustomSerializer[Double](_ => +class SpecialDoubleSerializer extends CustomSerializer[Double](ser => ({ case JString("NaN") => Double.NaN case JString("-Infinity") => Double.NegativeInfinity @@ -47,4 +47,6 @@ class SpecialDoubleSerializer extends CustomSerializer[Double](_ => case v: Double if v.isNaN => JString("NaN") case Double.NegativeInfinity => JString("-Infinity") case Double.PositiveInfinity => JString("Infinity") + case v: Double if ser.wantsBigDecimal => JDecimal(v) + case v: Double => JDouble(v) })) From e8770f64d24a4dcee3d94783a777cffa309314c2 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 20 Jun 2019 22:54:36 -0700 Subject: [PATCH 19/45] double ser test --- .../json/SpecialDoubleSerializerTest.scala | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala b/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala index 87ed5548ef..9138b04a12 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/json/SpecialDoubleSerializerTest.scala @@ -32,7 +32,7 @@ package com.salesforce.op.utils.json import com.salesforce.op.test.TestCommon import org.json4s.jackson.JsonMethods._ -import org.json4s.{DefaultFormats, Extraction} +import org.json4s.{DefaultFormats, Extraction, Formats} import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @@ -41,8 +41,6 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class SpecialDoubleSerializerTest extends FlatSpec with TestCommon { - implicit val formats = DefaultFormats + new SpecialDoubleSerializer - val data = Map( "normal" -> Seq(-1.1, 0.0, 2.3), "infs" -> Seq(Double.NegativeInfinity, Double.PositiveInfinity), @@ -50,18 +48,28 @@ class SpecialDoubleSerializerTest extends FlatSpec with TestCommon { "nan" -> Seq(Double.NaN) ) - val dataJson = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E308,1.7976931348623157E308],"nan":["NaN"]}""" // scalastyle:off + Spec[SpecialDoubleSerializer] should behave like + readWriteDoubleValues(data)( + json = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E308,1.7976931348623157E308],"nan":["NaN"]}""" // scalastyle:off + )(DefaultFormats + new SpecialDoubleSerializer) - Spec[SpecialDoubleSerializer] should "write double entries" in { - compact(Extraction.decompose(data)) shouldBe dataJson - } - it should "read double entries" in { - val parsed = parse(dataJson).extract[Map[String, Seq[Double]]] - parsed.keys shouldBe data.keys + Spec[SpecialDoubleSerializer] + " (with big decimal)" should behave like + readWriteDoubleValues(data)( + json = """{"normal":[-1.1,0.0,2.3],"infs":["-Infinity","Infinity"],"minMax":[-1.7976931348623157E+308,1.7976931348623157E+308],"nan":["NaN"]}""" // scalastyle:off + )(DefaultFormats.withBigDecimal + new SpecialDoubleSerializer) - parsed zip data foreach { - case (("nan", a), ("nan", b)) => a.foreach(_.isNaN shouldBe true) - case ((_, a), (_, b)) => a should contain theSameElementsAs b + + def readWriteDoubleValues(input: Map[String, Seq[Double]])(json: String)(implicit formats: Formats): Unit = { + it should "write double entries" in { + compact(Extraction.decompose(input)) shouldBe json + } + it should "read double entries" in { + val parsed = parse(json).extract[Map[String, Seq[Double]]] + parsed.keys shouldBe input.keys + parsed zip input foreach { + case (("nan", a), ("nan", b)) => a.foreach(_.isNaN shouldBe true) + case ((_, a), (_, b)) => a should contain theSameElementsAs b + } } } } From afffc56ed5302479e8d77489326b957d7b4f4b57 Mon Sep 17 00:00:00 2001 From: Matthew Date: Mon, 24 Jun 2019 15:49:46 -0700 Subject: [PATCH 20/45] update mleap and spark testing base --- build.gradle | 4 ++-- pom.xml | 24 ++++++------------------ 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/build.gradle b/build.gradle index 23cc845637..06dc54f80c 100644 --- a/build.gradle +++ b/build.gradle @@ -80,7 +80,7 @@ configure(allProjs) { collectionsVersion = '3.2.2' optimaizeLangDetectorVersion = '0.0.1' tikaVersion = '1.22' - sparkTestingBaseVersion = '2.4.0_0.11.0' + sparkTestingBaseVersion = '2.4.3_0.12.0' sourceCodeVersion = '0.1.3' pegdownVersion = '1.4.2' commonsValidatorVersion = '1.6' @@ -88,7 +88,7 @@ configure(allProjs) { scoveragePluginVersion = '1.3.1' xgboostVersion = '0.90' akkaSlf4jVersion = '2.3.11' - mleapVersion = '0.13.0' + mleapVersion = '0.14.0' memoryFilesystemVersion = '2.1.0' } diff --git a/pom.xml b/pom.xml index 07e102ccfd..17d2d5ec50 100644 --- a/pom.xml +++ b/pom.xml @@ -108,7 +108,7 @@ ml.dmlc xgboost4j-spark - 0.81 + 0.90 compile @@ -167,18 +167,6 @@ 3.2.11 compile - - com.databricks - spark-avro_2.11 - 4.0.0 - compile - - - avro - org.apache.avro - - - com.fasterxml.jackson.dataformat jackson-dataformat-yaml @@ -200,7 +188,7 @@ com.twitter chill-avro_2.11 - 0.8.4 + 0.9.3 compile @@ -212,7 +200,7 @@ com.twitter chill-algebird_2.11 - 0.8.4 + 0.9.3 compile @@ -260,7 +248,7 @@ org.apache.avro avro - 1.7.7 + 1.8.2 compile @@ -272,13 +260,13 @@ ml.combust.mleap mleap-spark_2.11 - 0.13.0 + 0.14.0 compile ml.combust.mleap mleap-runtime_2.11 - 0.13.0 + 0.14.0 compile From ed43719dc7af46368451285b4cff4dd12716cf28 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Mon, 15 Jul 2019 18:56:04 -0700 Subject: [PATCH 21/45] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 493acbf86d..e251b9302a 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ Start by picking TransmogrifAI version to match your project dependencies from t | TransmogrifAI Version | Spark Version | Scala Version | Java Version | |-------------------------------------------------------|:-------------:|:-------------:|:------------:| -| 0.6.2 (unreleased, master) | 2.3 | 2.11 | 1.8 | +| 0.6.2 (unreleased, master) | 2.4 | 2.11 | 1.8 | | **0.6.1 (stable)**, 0.6.0, 0.5.3, 0.5.2, 0.5.1, 0.5.0 | **2.3** | **2.11** | **1.8** | | 0.4.0, 0.3.4 | 2.2 | 2.11 | 1.8 | From 8804acd40dc13f6e4f4cd8122eb8a15a8bb947bc Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Mon, 14 Oct 2019 14:16:59 -0700 Subject: [PATCH 22/45] type fix --- .../org/apache/spark/ml/SparkDefaultParamsReadWrite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala index 1b6d9d674b..3d697588ba 100644 --- a/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala +++ b/features/src/main/scala/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala @@ -51,7 +51,7 @@ case object SparkDefaultParamsReadWrite { instance: OpPipelineStageBase, extraMetadata: Option[JObject] = None, paramMap: Option[JValue] = None - ): String = { + ): JObject = { val uid = instance.uid val cls = instance.getClass.getName val params = instance.paramMap.toSeq @@ -74,8 +74,7 @@ case object SparkDefaultParamsReadWrite { case None => basicMetadata } - val metadataJson: String = compact(render(metadata)) - metadataJson + metadata } /** From b54d0f52a1dd5a99f2a355ea5d8e1402bf5f704a Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Tue, 15 Oct 2019 16:11:28 -0700 Subject: [PATCH 23/45] bump minor version --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index ab8c90bea4..f8fb7ad658 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.6.2-SNAPSHOT +version=0.7.0-SNAPSHOT group=com.salesforce.transmogrifai org.gradle.caching=true From cb4cb7b4264782819293390d040945a347ae2c9b Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Fri, 18 Oct 2019 13:02:24 -0700 Subject: [PATCH 24/45] Update Spark version in the README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 493acbf86d..e251b9302a 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ Start by picking TransmogrifAI version to match your project dependencies from t | TransmogrifAI Version | Spark Version | Scala Version | Java Version | |-------------------------------------------------------|:-------------:|:-------------:|:------------:| -| 0.6.2 (unreleased, master) | 2.3 | 2.11 | 1.8 | +| 0.6.2 (unreleased, master) | 2.4 | 2.11 | 1.8 | | **0.6.1 (stable)**, 0.6.0, 0.5.3, 0.5.2, 0.5.1, 0.5.0 | **2.3** | **2.11** | **1.8** | | 0.4.0, 0.3.4 | 2.2 | 2.11 | 1.8 | From abca58bd21f6d1dc58a30646624e705254175428 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Fri, 18 Oct 2019 14:47:33 -0700 Subject: [PATCH 25/45] bump version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 06dc54f80c..990d6e0f0c 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ configure(allProjs) { scalaCheckVersion = '1.14.0' junitVersion = '4.12' avroVersion = '1.8.2' - sparkVersion = '2.4.3' + sparkVersion = '2.4.4' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' hadoopVersion = 'hadoop2' From e58da6cff1a3bba07a70b1f95d6666cc19aae487 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Sat, 19 Oct 2019 10:34:55 -0700 Subject: [PATCH 26/45] Update build.gradle --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 06dc54f80c..990d6e0f0c 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ configure(allProjs) { scalaCheckVersion = '1.14.0' junitVersion = '4.12' avroVersion = '1.8.2' - sparkVersion = '2.4.3' + sparkVersion = '2.4.4' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' hadoopVersion = 'hadoop2' From 37614d765fb04426613d8aade32c39860c4307f3 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Sat, 19 Oct 2019 10:40:00 -0700 Subject: [PATCH 27/45] Update pom.xml --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 17d2d5ec50..932b575551 100644 --- a/pom.xml +++ b/pom.xml @@ -242,7 +242,7 @@ org.apache.spark spark-sql_2.11 - 2.3.2 + 2.4.4 compile > From 505a881f231ef9ebd56b782fdf0393f5824f61e2 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Tue, 5 Nov 2019 13:05:08 -0800 Subject: [PATCH 28/45] set correct json4s version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 932b575551..37671b119e 100644 --- a/pom.xml +++ b/pom.xml @@ -164,7 +164,7 @@ org.json4s json4s-ext_2.11 - 3.2.11 + 3.5.3 compile From 3ba95b6193eb765565101b91500949094ca8383b Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Fri, 15 Nov 2019 10:38:37 -0800 Subject: [PATCH 29/45] upgrade helloworld deps --- helloworld/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helloworld/build.gradle b/helloworld/build.gradle index 3420f5fc95..50299cc7c5 100644 --- a/helloworld/build.gradle +++ b/helloworld/build.gradle @@ -37,9 +37,9 @@ ext { scalaVersion = '2.11' scalaVersionRevision = '12' junitVersion = '4.11' - sparkVersion = '2.3.2' + sparkVersion = '2.4.4' scalatestVersion = '3.0.0' - transmogrifaiVersion ='0.6.1' + transmogrifaiVersion ='0.7.0' collectionsVersion = '3.2.2' scoveragePluginVersion = '1.3.1' } From d77515c840847a1fb4f247163d93484f456dd696 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Fri, 15 Nov 2019 11:46:27 -0800 Subject: [PATCH 30/45] upgrade notebook deps on TMog and Spark --- helloworld/notebooks/OpHousingPrices.ipynb | 2 +- helloworld/notebooks/OpIris.ipynb | 4 ++-- helloworld/notebooks/OpTitanicSimple.ipynb | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/helloworld/notebooks/OpHousingPrices.ipynb b/helloworld/notebooks/OpHousingPrices.ipynb index 1245753ff5..76c6c59e17 100644 --- a/helloworld/notebooks/OpHousingPrices.ipynb +++ b/helloworld/notebooks/OpHousingPrices.ipynb @@ -16,7 +16,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.6.1" + "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.7.0" ] }, { diff --git a/helloworld/notebooks/OpIris.ipynb b/helloworld/notebooks/OpIris.ipynb index 4ae4bbf940..3e2fecd3d1 100644 --- a/helloworld/notebooks/OpIris.ipynb +++ b/helloworld/notebooks/OpIris.ipynb @@ -17,7 +17,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.6.1" + "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.7.0" ] }, { @@ -26,7 +26,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.3.2" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" ] }, { diff --git a/helloworld/notebooks/OpTitanicSimple.ipynb b/helloworld/notebooks/OpTitanicSimple.ipynb index ffc9a3236b..d1a82d3f1c 100644 --- a/helloworld/notebooks/OpTitanicSimple.ipynb +++ b/helloworld/notebooks/OpTitanicSimple.ipynb @@ -22,7 +22,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.6.1" + "%classpath add mvn com.salesforce.transmogrifai transmogrifai-core_2.11 0.7.0" ] }, { @@ -31,7 +31,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.3.2" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" ] }, { From b8a43b5965a9c75aeaf7168676566e6897691f96 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Thu, 5 Dec 2019 09:35:50 -0800 Subject: [PATCH 31/45] bump to version 0.7.0 for Spark update --- README.md | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e251b9302a..3d69bdec03 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ Start by picking TransmogrifAI version to match your project dependencies from t | TransmogrifAI Version | Spark Version | Scala Version | Java Version | |-------------------------------------------------------|:-------------:|:-------------:|:------------:| -| 0.6.2 (unreleased, master) | 2.4 | 2.11 | 1.8 | +| 0.7.0 (unreleased, master) | 2.4 | 2.11 | 1.8 | | **0.6.1 (stable)**, 0.6.0, 0.5.3, 0.5.2, 0.5.1, 0.5.0 | **2.3** | **2.11** | **1.8** | | 0.4.0, 0.3.4 | 2.2 | 2.11 | 1.8 | diff --git a/pom.xml b/pom.xml index 37671b119e..47564c8c12 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.salesforce.transmogrifai TransmogrifAI - 0.6.1 + 0.7.0 TransmogrifAI AutoML library for building modular, reusable, strongly typed machine learning workflows on Spark with minimal hand tuning https://github.com/salesforce/TransmogrifAI From 094be05f75b0d0c2201aaf278cc3b4d5192668e5 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Thu, 5 Dec 2019 09:36:30 -0800 Subject: [PATCH 32/45] align helloworld dependencies --- helloworld/build.gradle | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/helloworld/build.gradle b/helloworld/build.gradle index 50299cc7c5..fd6df45460 100644 --- a/helloworld/build.gradle +++ b/helloworld/build.gradle @@ -9,7 +9,6 @@ buildscript { } plugins { id 'com.github.johnrengelman.shadow' version '5.0.0' - id 'com.commercehub.gradle.plugin.avro' version '0.8.0' id 'org.scoverage' version '2.5.0' } repositories { @@ -36,7 +35,7 @@ mainClassName = "please.set.main.class.in.build.gradle" ext { scalaVersion = '2.11' scalaVersionRevision = '12' - junitVersion = '4.11' + junitVersion = '4.12' sparkVersion = '2.4.4' scalatestVersion = '3.0.0' transmogrifaiVersion ='0.7.0' From 4bd59774d18533d29f5268d787ad9a203dbddaf5 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Thu, 5 Dec 2019 09:39:49 -0800 Subject: [PATCH 33/45] align helloworld dependencies --- helloworld/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/helloworld/build.gradle b/helloworld/build.gradle index fd6df45460..0ca0c21f88 100644 --- a/helloworld/build.gradle +++ b/helloworld/build.gradle @@ -5,6 +5,7 @@ buildscript { } dependencies { classpath 'org.github.ngbinh.scalastyle:gradle-scalastyle-plugin_2.11:1.0.1' + classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0' } } plugins { From 46fc60af3ebcf133e6c2c433c789ad63bd58133b Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Fri, 6 Dec 2019 13:31:15 -0800 Subject: [PATCH 34/45] get -> getOrElse with exception --- .../op/stages/sparkwrappers/specific/OpPredictionModel.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala index bc59b13ba8..54b7777926 100644 --- a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala +++ b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/OpPredictionModel.scala @@ -55,7 +55,9 @@ abstract class OpPredictionModel[T <: PredictionModel[Vector, T]] /** * Predict label for the given features */ - @transient protected lazy val predict: Vector => Double = getSparkMlStage().get.predict(_) + @transient protected lazy val predict: Vector => Double = getSparkMlStage().getOrElse( + throw new RuntimeException(s"Could not find the wrapped Spark stage.") + ).predict(_) /** * Function used to convert input to output From 4d686c78347f467eb8ed7129f1a75e71c46f9e36 Mon Sep 17 00:00:00 2001 From: Nico de Vos Date: Tue, 7 Jan 2020 12:50:55 -0800 Subject: [PATCH 35/45] fix helloworld compilation --- helloworld/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helloworld/build.gradle b/helloworld/build.gradle index 0ca0c21f88..760e95a23a 100644 --- a/helloworld/build.gradle +++ b/helloworld/build.gradle @@ -39,7 +39,7 @@ ext { junitVersion = '4.12' sparkVersion = '2.4.4' scalatestVersion = '3.0.0' - transmogrifaiVersion ='0.7.0' + transmogrifaiVersion ='0.6.1' collectionsVersion = '3.2.2' scoveragePluginVersion = '1.3.1' } From b3c0a7452d6478dbd05e52358ffbd05a0081cd8a Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 13 Feb 2020 18:08:31 -0800 Subject: [PATCH 36/45] Spark 2.4.5 --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 990d6e0f0c..2e0e3dc087 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ configure(allProjs) { scalaCheckVersion = '1.14.0' junitVersion = '4.12' avroVersion = '1.8.2' - sparkVersion = '2.4.4' + sparkVersion = '2.4.5' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' hadoopVersion = 'hadoop2' From f4ab3fda85ab8d62d2c9f182a53d30a67baecaac Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 13 Feb 2020 18:09:09 -0800 Subject: [PATCH 37/45] Spark 2.4.5 --- helloworld/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helloworld/build.gradle b/helloworld/build.gradle index 760e95a23a..b1480bfe61 100644 --- a/helloworld/build.gradle +++ b/helloworld/build.gradle @@ -37,7 +37,7 @@ ext { scalaVersion = '2.11' scalaVersionRevision = '12' junitVersion = '4.12' - sparkVersion = '2.4.4' + sparkVersion = '2.4.5' scalatestVersion = '3.0.0' transmogrifaiVersion ='0.6.1' collectionsVersion = '3.2.2' From 50d9dfb98398039a6c6e46f9c71f0f8a92baabab Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 13 Feb 2020 18:10:03 -0800 Subject: [PATCH 38/45] Spark 2.4.5 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 401b01e0b2..e051bdc181 100644 --- a/pom.xml +++ b/pom.xml @@ -242,7 +242,7 @@ org.apache.spark spark-sql_2.11 - 2.4.4 + 2.4.5 compile From 341797238ecc039d5d5810c4244ccecf14080c3d Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 13 Feb 2020 18:10:36 -0800 Subject: [PATCH 39/45] Update OpTitanicSimple.ipynb --- helloworld/notebooks/OpTitanicSimple.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helloworld/notebooks/OpTitanicSimple.ipynb b/helloworld/notebooks/OpTitanicSimple.ipynb index d1a82d3f1c..392886e6fb 100644 --- a/helloworld/notebooks/OpTitanicSimple.ipynb +++ b/helloworld/notebooks/OpTitanicSimple.ipynb @@ -31,7 +31,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.5" ] }, { From df38bccff49d91698688bc5e7fa55d733aae0450 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 13 Feb 2020 18:10:57 -0800 Subject: [PATCH 40/45] Update OpIris.ipynb --- helloworld/notebooks/OpIris.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helloworld/notebooks/OpIris.ipynb b/helloworld/notebooks/OpIris.ipynb index 3e2fecd3d1..c68ebe406f 100644 --- a/helloworld/notebooks/OpIris.ipynb +++ b/helloworld/notebooks/OpIris.ipynb @@ -26,7 +26,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.5" ] }, { From 3af27f3f8011979db85acb1ca379cb4ca2f08929 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Fri, 14 Feb 2020 17:27:48 -0800 Subject: [PATCH 41/45] Revert "Spark 2.4.5" This reverts commit b3c0a7452d6478dbd05e52358ffbd05a0081cd8a. --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 2e0e3dc087..990d6e0f0c 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ configure(allProjs) { scalaCheckVersion = '1.14.0' junitVersion = '4.12' avroVersion = '1.8.2' - sparkVersion = '2.4.5' + sparkVersion = '2.4.4' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' hadoopVersion = 'hadoop2' From 66afd79334944927ab0a8fe1eeced786c497d9ae Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Fri, 14 Feb 2020 17:28:09 -0800 Subject: [PATCH 42/45] Revert "Spark 2.4.5" This reverts commit f4ab3fda85ab8d62d2c9f182a53d30a67baecaac. --- helloworld/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helloworld/build.gradle b/helloworld/build.gradle index b1480bfe61..760e95a23a 100644 --- a/helloworld/build.gradle +++ b/helloworld/build.gradle @@ -37,7 +37,7 @@ ext { scalaVersion = '2.11' scalaVersionRevision = '12' junitVersion = '4.12' - sparkVersion = '2.4.5' + sparkVersion = '2.4.4' scalatestVersion = '3.0.0' transmogrifaiVersion ='0.6.1' collectionsVersion = '3.2.2' From 39ff7555f63eb40b706b0966e9c79cd6f5fc9758 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Fri, 14 Feb 2020 17:28:14 -0800 Subject: [PATCH 43/45] Revert "Spark 2.4.5" This reverts commit 50d9dfb98398039a6c6e46f9c71f0f8a92baabab. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e051bdc181..401b01e0b2 100644 --- a/pom.xml +++ b/pom.xml @@ -242,7 +242,7 @@ org.apache.spark spark-sql_2.11 - 2.4.5 + 2.4.4 compile From 09c5f2b680978c6fb1737ad150c29c19d07068be Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Fri, 14 Feb 2020 17:28:20 -0800 Subject: [PATCH 44/45] Revert "Update OpTitanicSimple.ipynb" This reverts commit 341797238ecc039d5d5810c4244ccecf14080c3d. --- helloworld/notebooks/OpTitanicSimple.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helloworld/notebooks/OpTitanicSimple.ipynb b/helloworld/notebooks/OpTitanicSimple.ipynb index 392886e6fb..d1a82d3f1c 100644 --- a/helloworld/notebooks/OpTitanicSimple.ipynb +++ b/helloworld/notebooks/OpTitanicSimple.ipynb @@ -31,7 +31,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.5" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" ] }, { From d79bcc39bf34b33264f0a91002fecc357a040205 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Fri, 14 Feb 2020 17:28:26 -0800 Subject: [PATCH 45/45] Revert "Update OpIris.ipynb" This reverts commit df38bccff49d91698688bc5e7fa55d733aae0450. --- helloworld/notebooks/OpIris.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helloworld/notebooks/OpIris.ipynb b/helloworld/notebooks/OpIris.ipynb index c68ebe406f..3e2fecd3d1 100644 --- a/helloworld/notebooks/OpIris.ipynb +++ b/helloworld/notebooks/OpIris.ipynb @@ -26,7 +26,7 @@ "metadata": {}, "outputs": [], "source": [ - "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.5" + "%classpath add mvn org.apache.spark spark-mllib_2.11 2.4.4" ] }, {