Skip to content

Commit

Permalink
Adding in backup yield dataset as simple array of rows.
Browse files Browse the repository at this point in the history
We don't want to call data API from tasks and the backup yield dataset
is only 25 megabytes, so we just broadcast it to all processors, and
then look up in it when needed (infrequently) as an array (or maybe
hashtable) of rows.
  • Loading branch information
danscales committed Dec 16, 2024
1 parent 0618f7b commit cbcc05e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@ object GHGCommand extends SummaryCommand with LazyLogging {
val GHGYearStart: Int = 2020
val GHGYearEnd: Int = 2023

val backupYieldOpt: Opts[NonEmptyList[String]] = Opts
.options[String](
"backup_yield_url",
help = "URI of GADM features in TSV format"
)

val ghgCommand: Opts[Unit] = Opts.subcommand(
name = GHGAnalysis.name,
help = "Compute greenhouse gas emissions factors for GFW Pro locations."
) {
(
defaultOptions,
featureFilterOptions
).mapN { (default, filterOptions) =>
featureFilterOptions,
backupYieldOpt
).mapN { (default, filterOptions, backupYieldUrl) =>
val kwargs = Map(
"outputUrl" -> default.outputUrl,
"noOutputPathSuffix" -> default.noOutputPathSuffix,
Expand All @@ -35,11 +42,22 @@ object GHGCommand extends SummaryCommand with LazyLogging {
val featureFilter = FeatureFilter.fromOptions(default.featureType, filterOptions)

runAnalysis { implicit spark =>
println("Starting read")
// Read in the backup yield file. Then we're arranging to broadcast a copy to
// each node once, rather than copying into each task. The broadcast makes
// sense (as opposed to a rasterization or spatial partitioning), because the
// file is currently only 26 megabytes.
val backupDF = spark.read
.options(Map("header" -> "true", "delimiter" -> ",", "escape" -> "\""))
.csv(backupYieldUrl.toList: _*)
backupDF.printSchema()
val broadcastArray = spark.sparkContext.broadcast(backupDF.collect())
println(s"Done read")
val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true)

val fcdRDD = GHGAnalysis(
featureRDD,
kwargs
kwargs + ("backupYield" -> broadcastArray)
)

val fcdDF = GHGDF.getFeatureDataFrame(fcdRDD, spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import geotrellis.raster._
import geotrellis.raster.summary.GridVisitor
import org.globalforestwatch.summarystats.Summary
import org.globalforestwatch.util.Geodesy
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.Row

/** GHGRawData broken down by GHGRawDataGroup, which includes the loss year and crop yield */
case class GHGSummary(
Expand Down Expand Up @@ -84,6 +86,15 @@ object GHGSummary {
var cropYield = raster.tile.cocoYield.getData(col, row)
if (cropYield == 0) {
println("Empty cocoa yield")
val backupArray = kwargs("backupYield").asInstanceOf[Broadcast[Array[Row]]].value
for (r <- backupArray) {
if (r.getAs[String]("FIPS2") == "ZI10007" && r.getAs[String]("commodity") == "BANA") {
println(s"Found row $r")
}
}
//val r = backupDF.filter(col("FIPS2") === "AC01001" && col("commodity") == "BANA")
//r.show()
println("OK")
}

// Compute gross emissions Co2-equivalent due to tree loss at this pixel.
Expand Down

0 comments on commit cbcc05e

Please sign in to comment.