Skip to content

Commit

Permalink
Merge pull request #239 from wri/fix-comments
Browse files Browse the repository at this point in the history
Fixed a bunch of comments in the Pro geotrellis analyses
  • Loading branch information
danscales authored Jun 24, 2024
2 parents a73e849 + c02c457 commit 79ab006
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ object ValidatedFeatureRDD {
}

val envelope: Envelope = spatialFeatureRDD.boundaryEnvelope
// Since clip is true, we are going to exclude part or all of geometries that
// don't intersect with the TreeCoverLoss extent (e.g. geometries in Antarctica).
val spatialGridRDD = GridRDD(envelope, spark, clip = true)

// flatJoin is a flat list of pairs of (grid cell, featureGeom), giving all the
Expand All @@ -101,7 +103,7 @@ object ValidatedFeatureRDD {

/*
partitions will come back very skewed and we will need to even them out for any downstream analysis
For the summary analysis we will eventually use a range partitioner.
For the summary analysis we will eventually use a range (or hash) partitioner.
However, the range partitioner uses sampling to come up with the break points for the different partitions.
If the input RDD is already heavily skewed, sampling will be off and the range partitioner won't do a good job.
*/
Expand All @@ -118,6 +120,7 @@ object ValidatedFeatureRDD {
.partitionBy(hashPartitioner)
.flatMap { case (_, (gridCell, geom)) =>
val fid = geom.getUserData.asInstanceOf[FeatureId]
// Here we do the actual intersection to split the geometry by the 1x1 tiles.
validatedIntersection(geom, gridCell)
.leftMap { err => Location(fid, err) }
.map { geoms => geoms.map { geom =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
}

/*
* Use a Range Partitioner based on the Z curve value to efficiently and evenly partition RDD for analysis,
* Use a Hash Partitioner based on the Z curve value to efficiently and evenly partition RDD for analysis,
* but still preserving locality which will both reduce the S3 reads per executor and make it more likely
* for features to be close together already during export.
*/
val partitionedFeatureRDD = if (partition) {
// if a single tile has more than 4096 features, split it up over partitions
// Generally, features or parts of features intersecting the same window will
// go into the same partition. If a single window includes parts of more than
// 4096 features, then those parts will be split up over multiple partitions.
RepartitionSkewedRDD.bySparseId(keyedFeatureRDD, 4096)
} else {
keyedFeatureRDD.values
Expand Down Expand Up @@ -166,8 +168,9 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
}
}

/* Group records by Id and combine their summaries
* The features may have intersected multiple grid blocks
/* Group records by Id and combine their summaries. The features may have intersected
* multiple grid blocks. The combine operation for a SUMMARY is defined in
* summaryStats.summarySemigroup, based on its merge method.
*/
val featuresGroupedWithSummaries: RDD[ValidatedLocation[SUMMARY]] =
featuresWithSummaries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ import geotrellis.raster.summary.GridVisitor
import org.globalforestwatch.summarystats.{Summary, summarySemigroup}
import org.globalforestwatch.util.Geodesy

/** LossData Summary by year */
/** AFiData broken down by AFiDataGroup. */
case class AFiSummary(
stats: Map[AFiDataGroup, AFiData] = Map.empty
) extends Summary[AFiSummary] {

/** Combine two Maps and combine their LossData when a year is present in both */
/** Combine two Maps and combining AFIDataGroup entries that have the same values.
* This merge function is used by summaryStats.summarySemigroup to define a
* combine operation on AFiSummary, which is used to combine records with the same
* FeatureId in ErrorSummaryRDD. */
def merge(other: AFiSummary): AFiSummary = {
// the years.combine method uses LossData.lossDataSemigroup instance to perform per value combine on the map
// the stats.combine method uses AFiData.afiDataSemigroup instance to perform
// per-value combine on the map.
AFiSummary(stats.combine(other.stats))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,14 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis {
val name = "forest_change_diagnostic"

/** GFW Pro analysis of input features in a TSV file. The TSV file contains
* the individual list items, the merged list geometry, and the
* geometric difference from the current merged list geometry and the former one.
* the individual list items and the merged ("dissolved") list geometry.
* - Individual list items have location IDs >= 0
* - Merged list geometry has location ID -1
* - Geometric difference to previous version has location ID -2
*
* Merged list and geometric difference may or may be not present. If geometric
* difference is present, we only need to process chunks
* of the merged list which fall into the same grid cells as the
* geometric difference. Later in the analysis we will then read cached
* values for the remaining chunks and use them to aggregate list level results.
* The merged list may or may be not present.
*
* This function assumes that all features have already been split by 1x1 degree
* grid. This function will exclude diff geometry
* locations from output (id=-2).
* grid, so each location and merged list may have a single or multiple rows.
*/
def apply(
features: RDD[ValidatedLocation[Geometry]],
Expand Down Expand Up @@ -83,8 +76,9 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis {

// For all rows that didn't get an error from the FCD analysis, do the
// transformation from ForestChangeDiagnosticSummary to
// ForestChangeDiagnosticData and add commodity risk,
// commodity_threat_fires, and tree_cover_loss_soy_yearly.
// ForestChangeDiagnosticData and add commodity risk and
// commodity_threat_fires (both used by the Palm Risk tool), and
// tree_cover_loss_soy_yearly.
ValidatedWorkflow(locationSummaries).mapValid { summaries =>
summaries
.mapValues {
Expand Down Expand Up @@ -132,8 +126,8 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis {
}
}

/** Filter only to those rows covered by gridFilter, these are areas where location geometries have changed If gridFilter is empty list,
* all locations except diff geom will be preserved
/** Filter only to those rows covered by gridFilter, these are areas where location
* geometries have changed. If gridFilter is empty, all locations will be preserved.
*/
def filterDiffGridCells(
rdd: RDD[Location[Geometry]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ case class ForestChangeDiagnosticData(
}

/**
* Do the commodity risk calculations that are needed for the Palm Risk tool.
*
* @see https://docs.google.com/presentation/d/1nAq4mFNkv1q5vFvvXWReuLr4Znvr-1q-BDi6pl_5zTU/edit#slide=id.p
* @see https://docs.google.com/document/d/16-3lMdI4HyjiuHxzEqIgR5p2H_jH3DT0wR9XV7XzuAo/edit#heading=h.vjm582nbtt1d
*/
def withUpdatedCommodityRisk(): ForestChangeDiagnosticData = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ import org.globalforestwatch.util.Geodesy
import org.globalforestwatch.layers.ApproxYear
import org.globalforestwatch.layers.GFWProCoverage

/** LossData Summary by year */
/** ForestChangeDiagnosticRawData broken down by ForestChangeDiagnosticRawDataGroup,
* which includes the loss year, but lots of other characteristics as well. */
case class ForestChangeDiagnosticSummary(
stats: Map[ForestChangeDiagnosticRawDataGroup,
ForestChangeDiagnosticRawData] = Map.empty
) extends Summary[ForestChangeDiagnosticSummary] {

/** Combine two Maps and combine their LossData when a year is present in both */
/** Combine two Maps by combining ForestChangeDiagnosticRawDataGroup entries that
* have the same values. This merge function is used by
* summaryStats.summarySemigroup to define a combine operation on
* ForestChangeDiagnosticSummary, which is used to combine records with the same
* FeatureId in ErrorSummaryRDD. */
def merge(
other: ForestChangeDiagnosticSummary
): ForestChangeDiagnosticSummary = {
// the years.combine method uses LossData.lossDataSemigroup instance to perform per value combine on the map
// the stats.combine method uses the
// ForestChangeDiagnosticRawData.lossDataSemigroup instance to perform per-value
// combine on the map.
ForestChangeDiagnosticSummary(stats.combine(other.stats))
}

Expand All @@ -37,7 +44,7 @@ case class ForestChangeDiagnosticSummary(
}

object ForestChangeDiagnosticSummary {
// ForestChangeDiagnosticSummary from Raster[ForestChangeDiagnosticTile] -- cell types may not be the same
// Cell types of Raster[ForestChangeDiagnosticTile] may not be the same.

def getGridVisitor(
kwargs: Map[String, Any]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object GfwProDashboardAnalysis extends SummaryAnalysis {
val spatialFeatureRDD = RDDAdapter.toSpatialRDDfromLocationRdd(rdd, spark)

/* Enrich the feature RDD by intersecting it with contextual features
* The resulting FeatuerId carries combined identity of source fature and contextual geometry
* The resulting FeatureId carries combined identity of source feature and contextual geometry
*/
val enrichedRDD =
SpatialJoinRDD
Expand Down Expand Up @@ -81,9 +81,10 @@ object GfwProDashboardAnalysis extends SummaryAnalysis {
}

/** These geometries touch, apply application specific logic of how to treat that.
* - For intersection of location geometries only keep those where centroid of location is in the contextual geom (this ensures that
* any location is only assigned to a single contextual area even if it intersects more)
* - For dissolved geometry of list report all contextual areas it intersects
* - For intersection of location geometries, only keep those where centroid of
* location is in the contextual geom (this ensures that any location is only
* assigned to a single contextual area even if it intersects more).
* - For dissolved geometry of list, report all contextual areas it intersects
*/
private def refineContextualIntersection(
featureGeom: Geometry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,24 @@ import org.globalforestwatch.summarystats.Summary
import org.globalforestwatch.util.Geodesy
import java.time.LocalDate

/** LossData Summary by year */
/** GfwProDashboardRawData broken down by GfwProDashboardRawDataGroup, which includes
* alert date and confidence, but lots of other characteristics as well. */
case class GfwProDashboardSummary(
stats: Map[GfwProDashboardRawDataGroup, GfwProDashboardRawData] = Map.empty
) extends Summary[GfwProDashboardSummary] {

/** Combine two Maps and combine their LossData when a year is present in both */
/** Combine two Maps by combining GfwProDashboardRawDataGroup entries that have the
* same values. This merge function is used by summaryStats.summarySemigroup to
* define a combine operation on GfwProDashboardSummary, which is used to combine
* records with the same FeatureId in ErrorSummaryRDD. */
def merge(other: GfwProDashboardSummary): GfwProDashboardSummary = {
// the years.combine method uses LossData.lossDataSemigroup instance to perform per value combine on the map
// the stats.combine method uses the GfwProDashboardRawData.lossDataSemigroup
// instance to perform per-value combine on the map.
GfwProDashboardSummary(stats.combine(other.stats))
}
def isEmpty = stats.isEmpty

/** Pivot raw data to GfwProDashboardData and aggregate across alert dates. */
def toGfwProDashboardData(): GfwProDashboardData = {
stats
.map { case (group, data) => group.
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/org/globalforestwatch/util/GridRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.globalforestwatch.util.GeometryConstructor.createPolygon1x1
object GridRDD {
/** Returns an RDD with the set of 1x1 grid cells (polygons) that cover envelope. If
* clip is true, then don't include any grid cells that are not covered by
* tcl_geom.
* tcl_geom (which is the extent of the TreeCoverLoss data).
*/
def apply(envelope: Envelope, spark: SparkSession, clip: Boolean = false): PolygonRDD = {
val gridCells = getGridCells(envelope)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner

// Hash partitioner, where we aim to group all rows with same id together and put
// them into the same partition based on hashing. However, if a single id has more
// than maxPartitionSize rows, we break it up into groups of at most maxPartitionSize
// and features in each group go to the same partition. The number of partitions is
// the max of rdd.sparkContext.defaultParallelism, or the number of id groups (which
// is the number of unique ids, plus extra groups because of ids that have more than
// maxPartitionSize rows).
object RepartitionSkewedRDD {
def bySparseId[A: ClassTag](rdd: RDD[(Long, A)], maxPartitionSize: Int): RDD[A] = {
val counts = rdd.map{ case (id, _) => (id, 1l) }.reduceByKey(_ + _).collect().sortBy(_._2)
Expand Down Expand Up @@ -45,4 +52,4 @@ object PartitionSplit {
id -> PartitionSplit(index, splits) :: acc
}.toMap
}
}
}

0 comments on commit 79ab006

Please sign in to comment.