Skip to content

Commit

Permalink
Ensure correct metrics despite model failures on some CV folds (#404)
Browse files Browse the repository at this point in the history
* allow for model to fail on some CV folds and still get the metrics calculations correct
* code cleanup
  • Loading branch information
leahmcguire authored and gerashegalov committed Sep 11, 2019
1 parent 95a77b1 commit d223c46
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import com.salesforce.op.evaluators.{EvaluationMetrics, OpEvaluatorBase}
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.selector.ModelSelectorNames
import com.salesforce.op.utils.stages.FitStagesUtil._
import com.twitter.algebird.Monoid._
import com.twitter.algebird.Operators._
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.util.SparkThreadUtils
import com.twitter.algebird._
import com.twitter.algebird.Operators._

import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -49,24 +50,38 @@ private[op] class OpCrossValidation[M <: Model[_], E <: Estimator[_]]
) extends OpValidator[M, E] {

val validationName: String = ModelSelectorNames.CrossValResults
private val blas = BLAS.getInstance()

override def getParams(): Map[String, Any] = Map("numFolds" -> numFolds, "seed" -> seed,
"evaluator" -> evaluator.name.humanFriendlyName, "stratify" -> stratify, "parallelism" -> parallelism)

private implicit val doubleSemigroup = Semigroup.from[Double](_ + _)
private implicit val mapDoubleMonoid = Monoid.mapMonoid[String, Double](doubleSemigroup)

/**
* Should be called only on instances of the same model
*/
private def findBestModel(
folds: Seq[ValidatedModel[E]]
): ValidatedModel[E] = {
val metrics = folds.map(_.metrics).reduce(_ + _)
blas.dscal(metrics.length, 1.0 / numFolds, metrics, 1)
val ValidatedModel(est, _, _, grid) = folds.head
log.info(s"Average cross-validation for $est metrics: {}", metrics.toSeq.mkString(","))
val (bestMetric, bestIndex) =
if (evaluator.isLargerBetter) metrics.zipWithIndex.maxBy(_._1)
else metrics.zipWithIndex.minBy(_._1)
log.info(s"Best set of parameters:\n${grid(bestIndex)}")

val gridCounts = folds.flatMap(_.grids.map(_ -> 1)).sumByKey
val (_, maxFolds) = gridCounts.maxBy{ case (_, count) => count }
val gridsIn = gridCounts.filter{ case (_, foldCount) => foldCount == maxFolds }.keySet

val gridMetrics = folds.flatMap{
f => f.grids.zip(f.metrics).collect { case (pm, met) if gridsIn.contains(pm) => (pm, met / maxFolds) }
}.sumByKey

val ((bestGrid, bestMetric), bestIndex) =
if (evaluator.isLargerBetter) gridMetrics.zipWithIndex.maxBy{ case ((_, metric), _) => metric}
else gridMetrics.zipWithIndex.minBy{ case ((_, metric), _) => metric}

val ValidatedModel(est, _, _, _) = folds.head
log.info(s"Average cross-validation for $est metrics: {}", gridMetrics.mkString(","))
log.info(s"Best set of parameters:\n$bestGrid")
log.info(s"Best cross-validation metric: $bestMetric.")
ValidatedModel(est, bestIndex, metrics, grid)
val (grid, metrics) = gridMetrics.unzip
ValidatedModel(est, bestIndex, metrics.toArray, grid.toArray)
}

private[op] override def validate[T](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext
justScores.length shouldEqual transformedData.count()
}

it should "fit and predict for even when some models fail" in {
it should "fit and predict even when some models fail" in {
val testEstimator = RegressionModelSelector
.withCrossValidation(
numFolds = 4,
Expand All @@ -240,8 +240,31 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext
assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"),
s"Metric ${metric.entryName} is not present in metadata: " + metaData)
)
metaData.validationResults.foreach(println(_))
metaData.validationResults.size shouldBe 42
metaData.validationResults.size shouldBe 40
}


it should "fit and predict even when some parameter settings fail for one of the models" in {
val testEstimator = RegressionModelSelector
.withCrossValidation(
numFolds = 4,
validationMetric = Evaluators.Regression.mse(),
seed = 10L,
modelTypesToUse = Seq(RMT.OpGeneralizedLinearRegression)
)
.setInput(label, features)


val model = testEstimator.fit(data)
model.evaluateModel(data)

// evaluation metrics from train set should be in metadata
val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata())
RegressionEvalMetrics.values.foreach(metric =>
assert(metaData.trainEvaluation.toJson(false).contains(s"${metric.entryName}"),
s"Metric ${metric.entryName} is not present in metadata: " + metaData)
)
metaData.validationResults.size shouldBe 32
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,23 @@ class ModelSelectorTest extends OpEstimatorSpec[Prediction, SelectedModel, Model

private val lr = new OpLogisticRegression()
private val lrParams = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 100))
.addGrid(lr.elasticNetParam, Array(0, 0.5)).build()
.addGrid(lr.regParam, Array(0.1, 10000))
.addGrid(lr.elasticNetParam, Array(0.5)).build()

private val rf = new OpRandomForestClassifier()
private val rfParams = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(2, 4))
.addGrid(rf.minInfoGain, Array(100.0, 10.0)).build()
.addGrid(rf.minInfoGain, Array(1000.0, 100.0)).build()

private val linR = new OpLinearRegression()
private val linRParams = new ParamGridBuilder()
.addGrid(linR.regParam, Array(0.1, 100))
.addGrid(linR.regParam, Array(0.1, 1000))
.addGrid(linR.maxIter, Array(10, 20)).build()

private val rfR = new OpRandomForestRegressor()
private val rfRParams = new ParamGridBuilder()
.addGrid(rfR.numTrees, Array(2, 4))
.addGrid(rfR.minInfoGain, Array(100.0, 10.0)).build()
.addGrid(rfR.minInfoGain, Array(1000.0, 100.0)).build()

val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features",
Seq[(RealNN, OPVector)](
Expand Down

0 comments on commit d223c46

Please sign in to comment.