Skip to content

Commit

Permalink
GTC-3055 Added new GHG analysis
Browse files Browse the repository at this point in the history
Currently assumes the crop is Cocoa to determine yield value.
  • Loading branch information
danscales committed Nov 27, 2024
1 parent bc0dbdd commit 0618f7b
Show file tree
Hide file tree
Showing 19 changed files with 699 additions and 1 deletion.
12 changes: 12 additions & 0 deletions src/main/resources/raster-catalog-pro.json
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,18 @@
{
"name":"col_frontera_agricola",
"source_uri":"s3://gfw-data-lake/col_frontera_agricola/v2024/raster/epsg-4326/{grid_size}/{row_count}/category/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_coco_yield",
"source_uri":"s3://gfw-data-lake/mapspam_coco_yield/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil",
"source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2_ha-1/geotiff/{tile_id}.tif"
},
{
"name":"gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil",
"source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2e_ha-1/geotiff/{tile_id}.tif"
}
]
}
13 changes: 13 additions & 0 deletions src/main/scala/org/globalforestwatch/layers/MapspamCOCOYield.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.globalforestwatch.layers

import org.globalforestwatch.grids.GridTile

case class MapspamCOCOYield(gridTile: GridTile, kwargs: Map[String, Any])
extends FloatLayer
with OptionalFLayer {

val datasetName = "mapspam_coco_yield"

val uri: String =
uriForGrid(gridTile, kwargs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.globalforestwatch.summarystats.gladalerts.GladAlertsCommand.gladAlert
import org.globalforestwatch.summarystats.treecoverloss.TreeCoverLossCommand.treeCoverLossCommand
import org.globalforestwatch.summarystats.integrated_alerts.IntegratedAlertsCommand.integratedAlertsCommand
import org.globalforestwatch.summarystats.afi.AFiCommand.afiCommand
import org.globalforestwatch.summarystats.ghg.GHGCommand.ghgCommand
import com.monovore.decline._

object SummaryMain {
Expand All @@ -25,7 +26,8 @@ object SummaryMain {
gladAlertsCommand orElse
treeCoverLossCommand orElse
integratedAlertsCommand orElse
afiCommand
afiCommand orElse
ghgCommand
}
val command = Command(name, header, true)(main)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.globalforestwatch.summarystats.ghg

import cats.data.Validated.{Invalid, Valid}
import cats.implicits._

import geotrellis.vector.{Feature, Geometry}
import org.locationtech.jts.geom.Geometry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.globalforestwatch.summarystats.{Location, NoIntersectionError, SummaryAnalysis, ValidatedLocation}
import org.apache.spark.storage.StorageLevel
import org.globalforestwatch.ValidatedWorkflow

object GHGAnalysis extends SummaryAnalysis {

val name = "ghg"

/** GFW Pro analysis of input features in a TSV file. The TSV file contains
* the individual list items and the merged ("dissolved") list geometry.
* - Individual list items have location IDs >= 0
* - Merged list geometry has location ID -1
*
* The merged list may or may be not present.
*
* This function assumes that all features have already been split by 1x1 degree
* grid, so each location and merged list may have a single or multiple rows.
*/
def apply(
features: RDD[ValidatedLocation[Geometry]],
kwargs: Map[String, Any]
)(implicit spark: SparkSession): RDD[ValidatedLocation[GHGData]] = {
features.persist(StorageLevel.MEMORY_AND_DISK)

try {
val partialResult: RDD[ValidatedLocation[GHGData]] = {
ValidatedWorkflow(features)
.flatMap { locationGeometries =>
val locationSummaries: RDD[ValidatedLocation[GHGSummary]] = {
val tmp = locationGeometries.map { case Location(id, geom) => Feature(geom, id) }

// This is where the main analysis happens, in ErrorSummaryRDD.apply(),
// which eventually calls into GHGSummary via
// runPolygonalSummary().
GHGRDD(tmp, GHGGrid.blockTileGrid, kwargs)
}

// For all rows that didn't get an error from the GHG analysis, do the
// transformation from GHGSummary to
// GHGData
ValidatedWorkflow(locationSummaries).mapValid { summaries =>
summaries
.mapValues {
case summary: GHGSummary =>
val data = summary.toGHGData()
data
}
}
}
.unify
.persist(StorageLevel.MEMORY_AND_DISK)
}

partialResult.map {
case Valid(Location(fid, data)) if data.equals(GHGData.empty) =>
Invalid(Location(fid, NoIntersectionError))
case data => data
}
} catch {
case e: StackOverflowError =>
e.printStackTrace()
throw e
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.globalforestwatch.summarystats.ghg

import cats.data.NonEmptyList
import org.globalforestwatch.summarystats.SummaryCommand
import cats.implicits._
import com.monovore.decline.Opts
import org.globalforestwatch.features._
import com.typesafe.scalalogging.LazyLogging
import org.globalforestwatch.config.GfwConfig
import org.globalforestwatch.util.Config

object GHGCommand extends SummaryCommand with LazyLogging {
// Current range of years to do emissions factors for.
// Update GHGYearEnd when new tree loss data becomes available.
val GHGYearStart: Int = 2020
val GHGYearEnd: Int = 2023

val ghgCommand: Opts[Unit] = Opts.subcommand(
name = GHGAnalysis.name,
help = "Compute greenhouse gas emissions factors for GFW Pro locations."
) {
(
defaultOptions,
featureFilterOptions
).mapN { (default, filterOptions) =>
val kwargs = Map(
"outputUrl" -> default.outputUrl,
"noOutputPathSuffix" -> default.noOutputPathSuffix,
"overwriteOutput" -> default.overwriteOutput,
// Pin the version of gfw_integrated_alerts, so we don't make a data API request for 'latest'
"config" -> GfwConfig.get(Some(NonEmptyList.one(Config("gfw_integrated_alerts", "v20231121"))))
)

if (!default.splitFeatures) logger.warn("Forcing splitFeatures = true")
val featureFilter = FeatureFilter.fromOptions(default.featureType, filterOptions)

runAnalysis { implicit spark =>
val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true)

val fcdRDD = GHGAnalysis(
featureRDD,
kwargs
)

val fcdDF = GHGDF.getFeatureDataFrame(fcdRDD, spark)
GHGExport.export(default.featureType, fcdDF, default.outputUrl, kwargs)
}
}
}
}
41 changes: 41 additions & 0 deletions src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.globalforestwatch.summarystats.ghg

import cats.data.Validated.{Invalid, Valid}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.globalforestwatch.features._
import org.globalforestwatch.summarystats.{ValidatedLocation, Location}
import org.globalforestwatch.util.Util.fieldsFromCol
import org.globalforestwatch.summarystats.SummaryDF
import org.globalforestwatch.summarystats.SummaryDF.{RowError, RowId}

object GHGDF extends SummaryDF {

def getFeatureDataFrame(
dataRDD: RDD[ValidatedLocation[GHGData]],
spark: SparkSession
): DataFrame = {
import spark.implicits._

val rowId: FeatureId => RowId = {
case gfwproId: GfwProFeatureId =>
RowId(gfwproId.listId, gfwproId.locationId.toString)
case id =>
throw new IllegalArgumentException(s"Can't produce DataFrame for $id")
}

dataRDD.map {
case Valid(Location(fid, data)) =>
(rowId(fid), RowError.empty, data)
case Invalid(Location(fid, err)) =>
(rowId(fid), RowError.fromJobError(err), GHGData.empty)
}
.toDF("id", "error", "data")
.select($"id.*" :: $"error.*" :: fieldsFromCol($"data", featureFields): _*)
}

val featureFields = List(
"total_area",
"emissions_factor_yearly"
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.globalforestwatch.summarystats.ghg

import cats.Semigroup

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

/** Final data for each location.
*/
case class GHGData(
total_area: GHGDataDouble,
emissions_factor_yearly: GHGDataValueYearly
) {

def merge(other: GHGData): GHGData = {

GHGData(
total_area.merge(other.total_area),
emissions_factor_yearly.merge(other.emissions_factor_yearly)
)
}
}

object GHGData {

def empty: GHGData =
GHGData(
GHGDataDouble.empty,
GHGDataValueYearly.empty,
)

implicit val lossDataSemigroup: Semigroup[GHGData] =
new Semigroup[GHGData] {
def combine(x: GHGData,
y: GHGData): GHGData =
x.merge(y)
}

implicit def dataExpressionEncoder: ExpressionEncoder[GHGData] =
frameless.TypedExpressionEncoder[GHGData]
.asInstanceOf[ExpressionEncoder[GHGData]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.globalforestwatch.summarystats.ghg
import frameless.Injection
import org.globalforestwatch.util.Implicits._
import io.circe.syntax._


case class GHGDataDouble(value: Double) extends GHGDataParser[GHGDataDouble] {
def merge(
other: GHGDataDouble
): GHGDataDouble = {
GHGDataDouble(value + other.value)
}

def round: Double = this.round(value)

def toJson: String = {
this.round.asJson.noSpaces
}
}

object GHGDataDouble {
def empty: GHGDataDouble =
GHGDataDouble(0)

def fill(value: Double,
include: Boolean = true): GHGDataDouble = {
GHGDataDouble(value * include)
}

implicit def injection: Injection[GHGDataDouble, String] =
Injection(_.toJson, s => GHGDataDouble(s.toDouble))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.globalforestwatch.summarystats.ghg

import frameless.Injection

import scala.collection.immutable.SortedMap
import io.circe.syntax._
import io.circe.parser.decode
import cats.kernel.Semigroup
import cats.implicits._

case class GHGDataValueYearly(value: SortedMap[Int, Double])
extends GHGDataParser[GHGDataValueYearly] {

def merge(other: GHGDataValueYearly): GHGDataValueYearly = {
GHGDataValueYearly(Semigroup[SortedMap[Int, Double]].combine(value, other.value))
}

def round: SortedMap[Int, Double] = this.value.map { case (key, value) => key -> this.round(value) }

def toJson: String = {
this.round.asJson.noSpaces
}
}

object GHGDataValueYearly {
def empty: GHGDataValueYearly =
GHGDataValueYearly(
SortedMap()
)

def fromString(value: String): GHGDataValueYearly = {
val sortedMap = decode[SortedMap[Int, Double]](value)
GHGDataValueYearly(sortedMap.getOrElse(SortedMap()))
}

implicit def injection: Injection[GHGDataValueYearly, String] = Injection(_.toJson, fromString)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.globalforestwatch.summarystats.ghg

import org.apache.spark.sql.{DataFrame, SaveMode}
import org.globalforestwatch.summarystats.SummaryExport
import org.globalforestwatch.util.Util.getAnyMapValue

object GHGExport extends SummaryExport {

override val csvOptions: Map[String, String] = Map(
"header" -> "true",
"delimiter" -> "\t",
"quote" -> "\u0000",
"escape" -> "\u0000",
"quoteMode" -> "NONE",
"nullValue" -> null,
"emptyValue" -> null
)

override def export(
featureType: String,
summaryDF: DataFrame,
outputUrl: String,
kwargs: Map[String, Any]
): Unit = {
val saveMode: SaveMode =
if (getAnyMapValue[Boolean](kwargs, "overwriteOutput"))
SaveMode.Overwrite
else
SaveMode.ErrorIfExists

featureType match {
case "gfwpro" =>
summaryDF
.repartition(1)
.write
.mode(saveMode)
.options(csvOptions)
.csv(path = outputUrl + "/final")

case _ =>
throw new IllegalArgumentException(
"Feature type must be 'gfwpro'"
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.globalforestwatch.summarystats.ghg

import geotrellis.vector.Extent
import org.globalforestwatch.grids.{GridTile, TenByTen30mGrid}

object GHGGrid
extends TenByTen30mGrid[GHGGridSources] {

val gridExtent: Extent = Extent(-180.0000, -90.0000, 180.0000, 90.0000)

def getSources(gridTile: GridTile,
kwargs: Map[String, Any]): GHGGridSources =
GHGGridSources.getCachedSources(gridTile, kwargs)

}
Loading

0 comments on commit 0618f7b

Please sign in to comment.