Skip to content

Commit

Permalink
Added handling of commodity, yield columns.
Browse files Browse the repository at this point in the history
New extended GfwProFeatureExt and GfwProFeatureExtId classes. Added
option to pass the feature id (which includes the commodity and yield)
through to the summary analysis. This requires disabling the
optimization where we share full window results if features overlap
across whole windows.
  • Loading branch information
danscales committed Dec 31, 2024
1 parent 449b7d6 commit b50b0fe
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 62 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/org/globalforestwatch/features/Feature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ object Feature {
case "modis" => FireAlertModisFeature
case "burned_areas" => BurnedAreasFeature
case "gfwpro" => GfwProFeature
case "gfwpro_ext" => GfwProFeatureExt
case value =>
throw new IllegalArgumentException(
s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value."
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ object FeatureFilter extends LazyLogging {
case "viirs" => FeatureFilter.empty
case "modis" => FeatureFilter.empty
case "burned_areas" => FeatureFilter.empty
case "gfwpro" => GfwProFeature.Filter(options.base, options.featureId)
case value =>
throw new IllegalArgumentException(
s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object FeatureId {
case "wdpa" => WdpaFeature.getFeatureId(values, true)
case "geostore" => GeostoreFeature.getFeatureId(values, true)
case "gfwpro" => GfwProFeature.getFeatureId(values, true)
case "gfwpro_ext" => GfwProFeatureExt.getFeatureId(values, true)
case "burned_areas" => BurnedAreasFeature.getFeatureId(values, true)
case "viirs" => FireAlertViirsFeature.getFeatureId(values, true)
case "modis" => FireAlertModisFeature.getFeatureId(values, true)
Expand Down
17 changes: 0 additions & 17 deletions src/main/scala/org/globalforestwatch/features/GfwProFeature.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package org.globalforestwatch.features

import org.apache.spark.sql.functions.col
import org.globalforestwatch.summarystats.SummaryCommand
import org.apache.spark.sql.Column

object GfwProFeature extends Feature {

val listIdPos = 0
val locationIdPos = 1
val geomPos = 2
Expand All @@ -19,16 +14,4 @@ object GfwProFeature extends Feature {

GfwProFeatureId(listId, locationId)
}

case class Filter(
base: Option[SummaryCommand.BaseFilter],
id: Option[SummaryCommand.FeatureIdFilter]
) extends FeatureFilter {
def filterConditions: List[Column] = {
// TODO: is "iso" column same as "iso3"? gadm.isoFirst was applied to "iso" before
// TODO: add wdpaID filter
base.toList.flatMap(_.filters()) ++
id.toList.flatMap(_.filters(idColumn=col("location_id")))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.globalforestwatch.features

// GfwPro Feature that includes commodity and yield columns for GHG analysis.

object GfwProFeatureExt extends Feature {
val listIdPos = 0
val locationIdPos = 1
val geomPos = 2
val commodityPos = 3
val yieldPos = 4

val featureIdExpr = "list_id as listId, cast(location_id as int) as locationId, commodity as commodity, yield as yield"

def getFeatureId(i: Array[String], parsed: Boolean = false): FeatureId = {

val listId: String = i(0)
val locationId: Int = i(1).toInt
val commodity: String = i(2)
val yieldVal: Float = i(3).toFloat

GfwProFeatureExtId(listId, locationId, commodity, yieldVal)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.globalforestwatch.features

// GfwPro FeatureId that includes commodity and yield for GHG analysis.

case class GfwProFeatureExtId(listId: String, locationId: Int, commodity: String, yieldVal: Float) extends FeatureId {
override def toString: String = s"$listId, $locationId, $commodity, $yieldVal"
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,15 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
)
}

// Split features into those that completely contain the current window
// and those that only partially contain it
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey)
val (fullWindowFeatures, partialWindowFeatures) = features.partition {
feature =>
try {
feature.geom.contains(windowGeom)
} catch {
case e: org.locationtech.jts.geom.TopologyException =>
// fallback if JTS can't do the intersection because of a wonky geometry,
// just skip the optimization
false
}
}

def getSummaryForGeom(featureIds: List[FEATUREID], geom: Geometry): List[(FEATUREID, ValidatedSummary[SUMMARY])] = {
def getSummaryForGeom(geom: Geometry, mykwargs: Map[String, Any]): ValidatedSummary[SUMMARY] = {
val summary: Either[JobError, PolygonalSummaryResult[SUMMARY]] =
maybeRaster.flatMap { raster =>
Either.catchNonFatal {
runPolygonalSummary(
raster,
geom,
ErrorSummaryRDD.rasterizeOptions,
kwargs)
mykwargs)
}.left.map{
// TODO: these should be moved into left side of PolygonalSummaryResult in GT
case ise: java.lang.IllegalStateException =>
Expand All @@ -140,31 +125,60 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
}
// Converting to Validated so errors across partial results can be accumulated
// @see https://typelevel.org/cats/datatypes/validated.html#validated-vs-either
featureIds.map { id => (id, summary.toValidated) }
summary.toValidated
}

// for partial windows, we need to calculate summary for each geometry,
// since they all may have unique intersections with the window
val partialWindowResults = partialWindowFeatures.flatMap {
case feature =>
getSummaryForGeom(List(feature.data), feature.geom)
}
if (kwargs.get("includeFeatureId").isDefined) {
// Include the featureId in the kwargs passed to the polygonalSummary
// code. This is to handle the case where the featureId may include
// one or more columns that are used by the analysis. In that case,
// we can't do the optimization where we share fullWindow results
// across features.
val partialSummaries: Array[(FEATUREID, ValidatedSummary[SUMMARY])] =
features.map { feature: Feature[Geometry, FEATUREID] =>
val id: FEATUREID = feature.data
(id, getSummaryForGeom(feature.geom, kwargs + ("featureId" -> id)))
}
partialSummaries
} else {
// Split features into those that completely contain the current window
// and those that only partially contain it
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey)
val (fullWindowFeatures, partialWindowFeatures) = features.partition {
feature =>
try {
feature.geom.contains(windowGeom)
} catch {
case e: org.locationtech.jts.geom.TopologyException =>
// fallback if JTS can't do the intersection because of a wonky geometry,
// just skip the optimization
false
}
}

// if there are any full window intersections, we only need to calculate
// the summary for the window, and then tie it to each feature ID
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList
//if (fullWindowIds.size >= 2) {
// println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}")
//}
val fullWindowResults =
if (fullWindowFeatures.nonEmpty) {
getSummaryForGeom(fullWindowIds, windowGeom)
} else {
List.empty
}
// for partial windows, we need to calculate summary for each geometry,
// since they all may have unique intersections with the window
val partialWindowResults = partialWindowFeatures.map {
case feature =>
(feature.data, getSummaryForGeom(feature.geom, kwargs))
}

// combine results
partialWindowResults ++ fullWindowResults
// if there are any full window intersections, we only need to calculate
// the summary for the window, and then tie it to each feature ID
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList
//if (fullWindowIds.size >= 2) {
// println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}")
//}
val fullWindowResults =
if (fullWindowFeatures.nonEmpty) {
fullWindowIds.map { id => (id, getSummaryForGeom(windowGeom, kwargs)) }
} else {
List.empty
}

// combine results
partialWindowResults ++ fullWindowResults
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ object GHGCommand extends SummaryCommand with LazyLogging {
backupDF.printSchema()
val broadcastArray = spark.sparkContext.broadcast(backupDF.collect())
println(s"Done read")
val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true)
val featureRDD = ValidatedFeatureRDD(default.featureUris, "gfwpro_ext", featureFilter, splitFeatures = true)

// We add the option to include the featureId in the kwargs passed to the
// polygonalSummary for each feature. This allows use to use the commodity
// and yield columns to determine the yield to use for each pixel.
val fcdRDD = GHGAnalysis(
featureRDD,
kwargs + ("backupYield" -> broadcastArray)
kwargs + ("backupYield" -> broadcastArray) + ("includeFeatureId" -> true)
)

val fcdDF = GHGDF.getFeatureDataFrame(fcdRDD, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object GHGDF extends SummaryDF {
import spark.implicits._

val rowId: FeatureId => RowId = {
case gfwproId: GfwProFeatureId =>
case gfwproId: GfwProFeatureExtId =>
RowId(gfwproId.listId, gfwproId.locationId.toString)
case id =>
throw new IllegalArgumentException(s"Can't produce DataFrame for $id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.globalforestwatch.summarystats.Summary
import org.globalforestwatch.util.Geodesy
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row
import org.globalforestwatch.features.GfwProFeatureExtId

/** GHGRawData broken down by GHGRawDataGroup, which includes the loss year and crop yield */
case class GHGSummary(
Expand Down Expand Up @@ -59,6 +60,8 @@ object GHGSummary {
col: Int,
row: Int): Unit = {

val featureId = kwargs("featureId").asInstanceOf[GfwProFeatureExtId]

// This is a pixel by pixel operation

// pixel Area
Expand All @@ -82,8 +85,19 @@ object GHGSummary {
}
}

// Get default yield based on commodity
var cropYield = raster.tile.soybYield.getData(col, row)
var cropYield = if (featureId.yieldVal > 0.0) {
featureId.yieldVal
} else {
// Get default yield based on commodity
featureId.commodity match {
case "COCO" => raster.tile.cocoYield.getData(col, row)
case "COFF" => raster.tile.coffYield.getData(col, row)
case "OILP" => raster.tile.oilpYield.getData(col, row)
case "RUBB" => raster.tile.rubbYield.getData(col, row)
case "SOYB" => raster.tile.soybYield.getData(col, row)
case _ => throw new Exception("Invalid commodity")
}
}
println(s"Yield ${cropYield}, (${col}, ${row})")
if (cropYield == 0) {
println("Empty cocoa yield")
Expand Down

0 comments on commit b50b0fe

Please sign in to comment.