Skip to content

Commit

Permalink
Merge pull request #171 from wri/feature/prodes-split
Browse files Browse the repository at this point in the history
Feature/prodes split
  • Loading branch information
solomon-negusse authored Mar 13, 2023
2 parents e6c249d + b40fb90 commit 8865416
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 241 deletions.
12 changes: 6 additions & 6 deletions src/main/resources/raster-catalog-pro.json
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,12 @@
"source_uri":"s3://gfw-data-lake/gfw_managed_forests/v202106/raster/epsg-4326/{grid_size}/{row_count}/is/gdal-geotiff/{tile_id}.tif"
},
{
"name":"inpe_prodes",
"source_uri":"s3://gfw-data-lake/inpe_prodes/v202107/raster/epsg-4326/{grid_size}/{row_count}/is/gdal-geotiff/{tile_id}.tif"
"name":"inpe_amazon_prodes",
"source_uri":"s3://gfw-data-lake/inpe_amazon_prodes/v2021/raster/epsg-4326/{grid_size}/{row_count}/year/geotiff/{tile_id}.tif"
},
{
"name":"inpe_cerrado_prodes",
"source_uri":"s3://gfw-data-lake/inpe_cerrado_prodes/v2021/raster/epsg-4326/{grid_size}/{row_count}/year/geotiff/{tile_id}.tif"
},
{
"name":"gfw_mining_concessions",
Expand Down Expand Up @@ -272,10 +276,6 @@
"name":"umd_soy_planted_area",
"source_uri": "s3://gfw-data-lake/umd_soy_planted_area/v2/raster/epsg-4326/{grid_size}/{row_count}/is__year_2021/gdal-geotiff/{tile_id}.tif"
},
{
"name":"inpe_prodes",
"source_uri": "s3://gfw-data-lake/inpe_prodes/v202107/raster/epsg-4326/{grid_size}/{row_count}/is/gdal-geotiff/{tile_id}.tif"
},
{
"name":"wwf_eco_regions",
"source_uri": "s3://gfw-data-lake/wwf_eco_regions/v2012/raster/epsg-4326/{grid_size}/{row_count}/name/gdal-geotiff/{tile_id}.tif"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.globalforestwatch.layers

import org.globalforestwatch.grids.GridTile

case class ProdesAmazonLossYear(gridTile: GridTile, kwargs: Map[String, Any]) extends IntegerLayer with OptionalILayer {
val datasetName = "inpe_amazon_prodes"
val uri: String =
uriForGrid(gridTile, kwargs)

override def lookup(value: Int): Integer =
if (value == 0) null else value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.globalforestwatch.layers

import org.globalforestwatch.grids.GridTile

case class ProdesCerradoLossYear(gridTile: GridTile, kwargs: Map[String, Any]) extends IntegerLayer with OptionalILayer {
val datasetName = "inpe_cerrado_prodes"
val uri: String =
uriForGrid(gridTile, kwargs)

override def lookup(value: Int): Integer =
if (value == 0) null else value
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ object ForestChangeDiagnosticDF extends SummaryDF {
throw new IllegalArgumentException(s"Can't produce DataFrame for $id")
}

dataRDD.map {
case Valid(Location(fid, data)) =>
(rowId(fid), RowError.empty, data)
case Invalid(Location(fid, err)) =>
(rowId(fid), RowError.fromJobError(err), ForestChangeDiagnosticData.empty)
}
dataRDD
.map {
case Valid(Location(fid, data)) =>
(rowId(fid), RowError.empty, data)
case Invalid(Location(fid, err)) =>
(rowId(fid), RowError.fromJobError(err), ForestChangeDiagnosticData.empty)
}
.toDF("id", "error", "data")
.select($"id.*" :: $"error.*" :: fieldsFromCol($"data", featureFields): _*)
}
Expand All @@ -52,25 +53,27 @@ object ForestChangeDiagnosticDF extends SummaryDF {
throw new IllegalArgumentException("Not a CombinedFeatureId")
}

dataRDD.map {
case Valid(Location(fid, data)) =>
(rowId(fid), RowError.empty, data)
case Invalid(Location(fid, err)) =>
(rowId(fid), RowError.fromJobError(err), ForestChangeDiagnosticData.empty)
}
dataRDD
.map {
case Valid(Location(fid, data)) =>
(rowId(fid), RowError.empty, data)
case Invalid(Location(fid, err)) =>
(rowId(fid), RowError.fromJobError(err), ForestChangeDiagnosticData.empty)
}
.toDF("id", "error", "data")
.select($"id.*" :: $"error.*" :: fieldsFromCol($"data", featureFields) ::: fieldsFromCol($"data", gridFields): _*)
}

def readIntermidateRDD(
sources: NonEmptyList[String],
spark: SparkSession,
spark: SparkSession
): RDD[ValidatedLocation[ForestChangeDiagnosticData]] = {
val df = FeatureDF(sources, GfwProFeature, FeatureFilter.empty, spark)
val ds = df.select(
colsFor[RowGridId].as[RowGridId],
colsFor[RowError].as[RowError],
colsFor[ForestChangeDiagnosticData].as[ForestChangeDiagnosticData])
colsFor[ForestChangeDiagnosticData].as[ForestChangeDiagnosticData]
)

ds.rdd.map { case (id, error, data) =>
if (error.status_code == 2) Valid(Location(id.toFeatureID, data))
Expand Down Expand Up @@ -101,9 +104,12 @@ object ForestChangeDiagnosticDF extends SummaryDF {
"tree_cover_loss_soy_yearly", // treeCoverLossSoyPlanedAreasYearly
"tree_cover_loss_idn_legal_yearly", // treeCoverLossIDNForestAreaYearly
"tree_cover_loss_idn_forest_moratorium_yearly", // treeCoverLossIDNForestMoratoriumYearly
"tree_cover_loss_prodes_yearly", // prodesLossYearly
"tree_cover_loss_prodes_wdpa_yearly", // prodesLossProtectedAreasYearly
"tree_cover_loss_prodes_primary_forest_yearly", // prodesLossProdesPrimaryForestYearly
"tree_cover_loss_prodes_amazon_yearly", // prodesLossAmazonYearly
"tree_cover_loss_prodes_cerrado_yearly", // prodesLossCerradoYearly
"tree_cover_loss_prodes_amazon_wdpa_yearly", // prodesLossAmazonProtectedAreasYearly
"tree_cover_loss_prodes_cerrado_wdpa_yearly", // prodesLossCerradoProtectedAreasYearly
"tree_cover_loss_prodes_amazon_primary_forest_yearly", // prodesLossProdesAmazonPrimaryForestYearly
"tree_cover_loss_prodes_cerrado_primary_forest_yearly", // prodesLossProdesCerradoPrimaryForestYearly
"tree_cover_loss_brazil_biomes_yearly", // treeCoverLossBRABiomesYearly
"tree_cover_extent_total", // treeCoverExtent
"tree_cover_extent_primary_forest", // treeCoverExtentPrimaryForest
Expand Down Expand Up @@ -149,4 +155,4 @@ object ForestChangeDiagnosticDF extends SummaryDF {
"plantation_in_protected_areas_area" //plantationInProtectedAreasArea
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ case class ForestChangeDiagnosticData(
/** treeCoverLossIDNForestAreaYearly */
tree_cover_loss_idn_legal_yearly: ForestChangeDiagnosticDataLossYearlyCategory,
tree_cover_loss_idn_forest_moratorium_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_amazon_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_cerrado_yearly: ForestChangeDiagnosticDataLossYearly,
/** prodesLossProtectedAreasYearly */
tree_cover_loss_prodes_wdpa_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_primary_forest_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_amazon_wdpa_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_cerrado_wdpa_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_amazon_primary_forest_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_prodes_cerrado_primary_forest_yearly: ForestChangeDiagnosticDataLossYearly,
tree_cover_loss_brazil_biomes_yearly: ForestChangeDiagnosticDataLossYearlyCategory,
tree_cover_extent_total: ForestChangeDiagnosticDataDouble,
tree_cover_extent_primary_forest: ForestChangeDiagnosticDataDouble,
Expand Down Expand Up @@ -116,12 +119,19 @@ case class ForestChangeDiagnosticData(
tree_cover_loss_idn_forest_moratorium_yearly.merge(
other.tree_cover_loss_idn_forest_moratorium_yearly
),
tree_cover_loss_prodes_yearly.merge(other.tree_cover_loss_prodes_yearly),
tree_cover_loss_prodes_wdpa_yearly.merge(
other.tree_cover_loss_prodes_wdpa_yearly
tree_cover_loss_prodes_amazon_yearly.merge(other.tree_cover_loss_prodes_amazon_yearly),
tree_cover_loss_prodes_cerrado_yearly.merge(other.tree_cover_loss_prodes_cerrado_yearly),
tree_cover_loss_prodes_amazon_wdpa_yearly.merge(
other.tree_cover_loss_prodes_amazon_wdpa_yearly
),
tree_cover_loss_prodes_primary_forest_yearly.merge(
other.tree_cover_loss_prodes_primary_forest_yearly
tree_cover_loss_prodes_cerrado_wdpa_yearly.merge(
other.tree_cover_loss_prodes_cerrado_wdpa_yearly
),
tree_cover_loss_prodes_amazon_primary_forest_yearly.merge(
other.tree_cover_loss_prodes_amazon_primary_forest_yearly
),
tree_cover_loss_prodes_cerrado_primary_forest_yearly.merge(
other.tree_cover_loss_prodes_cerrado_primary_forest_yearly
),
tree_cover_loss_brazil_biomes_yearly.merge(other.tree_cover_loss_brazil_biomes_yearly),
tree_cover_extent_total.merge(other.tree_cover_extent_total),
Expand Down Expand Up @@ -171,25 +181,27 @@ case class ForestChangeDiagnosticData(
)
}

/**
* @see https://docs.google.com/presentation/d/1nAq4mFNkv1q5vFvvXWReuLr4Znvr-1q-BDi6pl_5zTU/edit#slide=id.p
/** @see
* https://docs.google.com/presentation/d/1nAq4mFNkv1q5vFvvXWReuLr4Znvr-1q-BDi6pl_5zTU/edit#slide=id.p
*/
def withUpdatedCommodityRisk(): ForestChangeDiagnosticData = {

/* Exclude the last year, limit data to 2021 to sync with palm risk tool:
commodity_threat_deforestation, commodity_threat_peat, commodity_threat_protected_areas use year n and year n-1.
Including information from the current year would under-represent these values as it's in progress.
*/
*/
val minLossYear = ForestChangeDiagnosticDataLossYearly.prefilled.value.keys.min
val maxLossYear = 2021
val years: List[Int] = List.range(minLossYear + 1, maxLossYear + 1)

val forestValueIndicator: ForestChangeDiagnosticDataValueYearly =
ForestChangeDiagnosticDataValueYearly.fill(
filtered_tree_cover_extent.value,
filtered_tree_cover_loss_yearly.value,
2
).limitToMaxYear(maxLossYear)
ForestChangeDiagnosticDataValueYearly
.fill(
filtered_tree_cover_extent.value,
filtered_tree_cover_loss_yearly.value,
2
)
.limitToMaxYear(maxLossYear)

val peatValueIndicator: ForestChangeDiagnosticDataValueYearly =
ForestChangeDiagnosticDataValueYearly.fill(peat_area.value).limitToMaxYear(maxLossYear)
Expand All @@ -200,66 +212,69 @@ case class ForestChangeDiagnosticData(
val deforestationThreatIndicator: ForestChangeDiagnosticDataLossYearly =
ForestChangeDiagnosticDataLossYearly(
SortedMap(
years.map(
year =>
(year, {
years.map(year =>
(
year, {
// Somehow the compiler cannot infer the types correctly
// I hence declare them here explicitly to help him out.
val thisYearLoss: Double =
filtered_tree_cover_loss_yearly.value
.getOrElse(year, 0)
filtered_tree_cover_loss_yearly.value
.getOrElse(year, 0)

val lastYearLoss: Double =
filtered_tree_cover_loss_yearly.value
.getOrElse(year - 1, 0)

thisYearLoss + lastYearLoss
})
}
)
): _*
)
).limitToMaxYear(maxLossYear)

val peatThreatIndicator: ForestChangeDiagnosticDataLossYearly =
ForestChangeDiagnosticDataLossYearly(
SortedMap(
years.map(
year =>
(year, {
years.map(year =>
(
year, {
// Somehow the compiler cannot infer the types correctly
// I hence declare them here explicitly to help him out.
val thisYearPeatLoss: Double =
filtered_tree_cover_loss_peat_yearly.value
.getOrElse(year, 0)
filtered_tree_cover_loss_peat_yearly.value
.getOrElse(year, 0)

val lastYearPeatLoss: Double =
filtered_tree_cover_loss_peat_yearly.value
.getOrElse(year - 1, 0)

thisYearPeatLoss + lastYearPeatLoss + plantation_on_peat_area.value

})
}
)
): _*
)
).limitToMaxYear(maxLossYear)

val protectedAreaThreatIndicator: ForestChangeDiagnosticDataLossYearly =
ForestChangeDiagnosticDataLossYearly(
SortedMap(
years.map(
year =>
(year, {
years.map(year =>
(
year, {
// Somehow the compiler cannot infer the types correctly
// I hence declare them here explicitly to help him out.
val thisYearProtectedAreaLoss: Double =
filtered_tree_cover_loss_protected_areas_yearly.value
.getOrElse(year, 0)
filtered_tree_cover_loss_protected_areas_yearly.value
.getOrElse(year, 0)

val lastYearProtectedAreaLoss: Double =
filtered_tree_cover_loss_protected_areas_yearly.value
.getOrElse(year - 1, 0)

thisYearProtectedAreaLoss + lastYearProtectedAreaLoss + plantation_in_protected_areas_area.value
})
}
)
): _*
)
).limitToMaxYear(maxLossYear)
Expand All @@ -270,7 +285,8 @@ case class ForestChangeDiagnosticData(
commodity_value_protected_areas = protectedAreaValueIndicator,
commodity_threat_deforestation = deforestationThreatIndicator,
commodity_threat_peat = peatThreatIndicator,
commodity_threat_protected_areas = protectedAreaThreatIndicator)
commodity_threat_protected_areas = protectedAreaThreatIndicator
)
}

}
Expand All @@ -294,6 +310,9 @@ object ForestChangeDiagnosticData {
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearlyCategory.empty,
ForestChangeDiagnosticDataDouble.empty,
ForestChangeDiagnosticDataDouble.empty,
Expand Down Expand Up @@ -332,17 +351,17 @@ object ForestChangeDiagnosticData {
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty,
ForestChangeDiagnosticDataLossYearly.empty
)

implicit val lossDataSemigroup: Semigroup[ForestChangeDiagnosticData] =
new Semigroup[ForestChangeDiagnosticData] {
def combine(x: ForestChangeDiagnosticData,
y: ForestChangeDiagnosticData): ForestChangeDiagnosticData =
def combine(x: ForestChangeDiagnosticData, y: ForestChangeDiagnosticData): ForestChangeDiagnosticData =
x.merge(y)
}

implicit def dataExpressionEncoder: ExpressionEncoder[ForestChangeDiagnosticData] =
frameless.TypedExpressionEncoder[ForestChangeDiagnosticData]
frameless
.TypedExpressionEncoder[ForestChangeDiagnosticData]
.asInstanceOf[ExpressionEncoder[ForestChangeDiagnosticData]]
}
Loading

0 comments on commit 8865416

Please sign in to comment.