diff --git a/src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala b/src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala index 62024269..616ebb09 100644 --- a/src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala +++ b/src/main/scala/org/globalforestwatch/features/ValidatedFeatureRDD.scala @@ -88,6 +88,8 @@ object ValidatedFeatureRDD { } val envelope: Envelope = spatialFeatureRDD.boundaryEnvelope + // Since clip is true, we are going to exclude part or all of geometries that + // don't intersect with the TreeCoverLoss extent (e.g. geometries in Antarctica). val spatialGridRDD = GridRDD(envelope, spark, clip = true) // flatJoin is a flat list of pairs of (grid cell, featureGeom), giving all the @@ -101,7 +103,7 @@ object ValidatedFeatureRDD { /* partitions will come back very skewed and we will need to even them out for any downstream analysis - For the summary analysis we will eventually use a range partitioner. + For the summary analysis we will eventually use a range (or hash) partitioner. However, the range partitioner uses sampling to come up with the break points for the different partitions. If the input RDD is already heavily skewed, sampling will be off and the range partitioner won't do a good job. */ @@ -118,6 +120,7 @@ object ValidatedFeatureRDD { .partitionBy(hashPartitioner) .flatMap { case (_, (gridCell, geom)) => val fid = geom.getUserData.asInstanceOf[FeatureId] + // Here we do the actual intersection to split the geometry by the 1x1 tiles. validatedIntersection(geom, gridCell) .leftMap { err => Location(fid, err) } .map { geoms => geoms.map { geom => diff --git a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala index a27211ab..cf96168a 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala @@ -54,12 +54,14 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { } /* - * Use a Range Partitioner based on the Z curve value to efficiently and evenly partition RDD for analysis, + * Use a Hash Partitioner based on the Z curve value to efficiently and evenly partition RDD for analysis, * but still preserving locality which will both reduce the S3 reads per executor and make it more likely * for features to be close together already during export. */ val partitionedFeatureRDD = if (partition) { - // if a single tile has more than 4096 features, split it up over partitions + // Generally, features or parts of features intersecting the same window will + // go into the same partition. If a single window includes parts of more than + // 4096 features, then those parts will be split up over multiple partitions. RepartitionSkewedRDD.bySparseId(keyedFeatureRDD, 4096) } else { keyedFeatureRDD.values @@ -166,8 +168,9 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { } } - /* Group records by Id and combine their summaries - * The features may have intersected multiple grid blocks + /* Group records by Id and combine their summaries. The features may have intersected + * multiple grid blocks. The combine operation for a SUMMARY is defined in + * summaryStats.summarySemigroup, based on its merge method. */ val featuresGroupedWithSummaries: RDD[ValidatedLocation[SUMMARY]] = featuresWithSummaries diff --git a/src/main/scala/org/globalforestwatch/summarystats/afi/AFiSummary.scala b/src/main/scala/org/globalforestwatch/summarystats/afi/AFiSummary.scala index 5257ce25..4c978df3 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/afi/AFiSummary.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/afi/AFiSummary.scala @@ -7,14 +7,18 @@ import geotrellis.raster.summary.GridVisitor import org.globalforestwatch.summarystats.{Summary, summarySemigroup} import org.globalforestwatch.util.Geodesy -/** LossData Summary by year */ +/** AFiData broken down by AFiDataGroup. */ case class AFiSummary( stats: Map[AFiDataGroup, AFiData] = Map.empty ) extends Summary[AFiSummary] { - /** Combine two Maps and combine their LossData when a year is present in both */ + /** Combine two Maps and combining AFIDataGroup entries that have the same values. + * This merge function is used by summaryStats.summarySemigroup to define a + * combine operation on AFiSummary, which is used to combine records with the same + * FeatureId in ErrorSummaryRDD. */ def merge(other: AFiSummary): AFiSummary = { - // the years.combine method uses LossData.lossDataSemigroup instance to perform per value combine on the map + // the stats.combine method uses AFiData.afiDataSemigroup instance to perform + // per-value combine on the map. AFiSummary(stats.combine(other.stats)) } diff --git a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala index 89fcf722..b02f4d6a 100755 --- a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticAnalysis.scala @@ -22,21 +22,14 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis { val name = "forest_change_diagnostic" /** GFW Pro analysis of input features in a TSV file. The TSV file contains - * the individual list items, the merged list geometry, and the - * geometric difference from the current merged list geometry and the former one. + * the individual list items and the merged ("dissolved") list geometry. * - Individual list items have location IDs >= 0 * - Merged list geometry has location ID -1 - * - Geometric difference to previous version has location ID -2 * - * Merged list and geometric difference may or may be not present. If geometric - * difference is present, we only need to process chunks - * of the merged list which fall into the same grid cells as the - * geometric difference. Later in the analysis we will then read cached - * values for the remaining chunks and use them to aggregate list level results. + * The merged list may or may be not present. * * This function assumes that all features have already been split by 1x1 degree - * grid. This function will exclude diff geometry - * locations from output (id=-2). + * grid, so each location and merged list may have a single or multiple rows. */ def apply( features: RDD[ValidatedLocation[Geometry]], @@ -83,8 +76,9 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis { // For all rows that didn't get an error from the FCD analysis, do the // transformation from ForestChangeDiagnosticSummary to - // ForestChangeDiagnosticData and add commodity risk, - // commodity_threat_fires, and tree_cover_loss_soy_yearly. + // ForestChangeDiagnosticData and add commodity risk and + // commodity_threat_fires (both used by the Palm Risk tool), and + // tree_cover_loss_soy_yearly. ValidatedWorkflow(locationSummaries).mapValid { summaries => summaries .mapValues { @@ -132,8 +126,8 @@ object ForestChangeDiagnosticAnalysis extends SummaryAnalysis { } } - /** Filter only to those rows covered by gridFilter, these are areas where location geometries have changed If gridFilter is empty list, - * all locations except diff geom will be preserved + /** Filter only to those rows covered by gridFilter, these are areas where location + * geometries have changed. If gridFilter is empty, all locations will be preserved. */ def filterDiffGridCells( rdd: RDD[Location[Geometry]], diff --git a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticData.scala b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticData.scala index 20b943c7..48a2c4b2 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticData.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticData.scala @@ -220,7 +220,10 @@ case class ForestChangeDiagnosticData( } /** + * Do the commodity risk calculations that are needed for the Palm Risk tool. + * * @see https://docs.google.com/presentation/d/1nAq4mFNkv1q5vFvvXWReuLr4Znvr-1q-BDi6pl_5zTU/edit#slide=id.p + * @see https://docs.google.com/document/d/16-3lMdI4HyjiuHxzEqIgR5p2H_jH3DT0wR9XV7XzuAo/edit#heading=h.vjm582nbtt1d */ def withUpdatedCommodityRisk(): ForestChangeDiagnosticData = { diff --git a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticSummary.scala b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticSummary.scala index 48d56cf0..ada8e110 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticSummary.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/forest_change_diagnostic/ForestChangeDiagnosticSummary.scala @@ -8,17 +8,24 @@ import org.globalforestwatch.util.Geodesy import org.globalforestwatch.layers.ApproxYear import org.globalforestwatch.layers.GFWProCoverage -/** LossData Summary by year */ +/** ForestChangeDiagnosticRawData broken down by ForestChangeDiagnosticRawDataGroup, + * which includes the loss year, but lots of other characteristics as well. */ case class ForestChangeDiagnosticSummary( stats: Map[ForestChangeDiagnosticRawDataGroup, ForestChangeDiagnosticRawData] = Map.empty ) extends Summary[ForestChangeDiagnosticSummary] { - /** Combine two Maps and combine their LossData when a year is present in both */ + /** Combine two Maps by combining ForestChangeDiagnosticRawDataGroup entries that + * have the same values. This merge function is used by + * summaryStats.summarySemigroup to define a combine operation on + * ForestChangeDiagnosticSummary, which is used to combine records with the same + * FeatureId in ErrorSummaryRDD. */ def merge( other: ForestChangeDiagnosticSummary ): ForestChangeDiagnosticSummary = { - // the years.combine method uses LossData.lossDataSemigroup instance to perform per value combine on the map + // the stats.combine method uses the + // ForestChangeDiagnosticRawData.lossDataSemigroup instance to perform per-value + // combine on the map. ForestChangeDiagnosticSummary(stats.combine(other.stats)) } @@ -37,7 +44,7 @@ case class ForestChangeDiagnosticSummary( } object ForestChangeDiagnosticSummary { - // ForestChangeDiagnosticSummary from Raster[ForestChangeDiagnosticTile] -- cell types may not be the same + // Cell types of Raster[ForestChangeDiagnosticTile] may not be the same. def getGridVisitor( kwargs: Map[String, Any] diff --git a/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardAnalysis.scala b/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardAnalysis.scala index 7306ad51..e3fbeb44 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardAnalysis.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardAnalysis.scala @@ -40,7 +40,7 @@ object GfwProDashboardAnalysis extends SummaryAnalysis { val spatialFeatureRDD = RDDAdapter.toSpatialRDDfromLocationRdd(rdd, spark) /* Enrich the feature RDD by intersecting it with contextual features - * The resulting FeatuerId carries combined identity of source fature and contextual geometry + * The resulting FeatureId carries combined identity of source feature and contextual geometry */ val enrichedRDD = SpatialJoinRDD @@ -81,9 +81,10 @@ object GfwProDashboardAnalysis extends SummaryAnalysis { } /** These geometries touch, apply application specific logic of how to treat that. - * - For intersection of location geometries only keep those where centroid of location is in the contextual geom (this ensures that - * any location is only assigned to a single contextual area even if it intersects more) - * - For dissolved geometry of list report all contextual areas it intersects + * - For intersection of location geometries, only keep those where centroid of + * location is in the contextual geom (this ensures that any location is only + * assigned to a single contextual area even if it intersects more). + * - For dissolved geometry of list, report all contextual areas it intersects */ private def refineContextualIntersection( featureGeom: Geometry, diff --git a/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardSummary.scala b/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardSummary.scala index d896f22d..7894ab31 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardSummary.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/gfwpro_dashboard/GfwProDashboardSummary.scala @@ -8,18 +8,24 @@ import org.globalforestwatch.summarystats.Summary import org.globalforestwatch.util.Geodesy import java.time.LocalDate -/** LossData Summary by year */ +/** GfwProDashboardRawData broken down by GfwProDashboardRawDataGroup, which includes + * alert date and confidence, but lots of other characteristics as well. */ case class GfwProDashboardSummary( stats: Map[GfwProDashboardRawDataGroup, GfwProDashboardRawData] = Map.empty ) extends Summary[GfwProDashboardSummary] { - /** Combine two Maps and combine their LossData when a year is present in both */ + /** Combine two Maps by combining GfwProDashboardRawDataGroup entries that have the + * same values. This merge function is used by summaryStats.summarySemigroup to + * define a combine operation on GfwProDashboardSummary, which is used to combine + * records with the same FeatureId in ErrorSummaryRDD. */ def merge(other: GfwProDashboardSummary): GfwProDashboardSummary = { - // the years.combine method uses LossData.lossDataSemigroup instance to perform per value combine on the map + // the stats.combine method uses the GfwProDashboardRawData.lossDataSemigroup + // instance to perform per-value combine on the map. GfwProDashboardSummary(stats.combine(other.stats)) } def isEmpty = stats.isEmpty + /** Pivot raw data to GfwProDashboardData and aggregate across alert dates. */ def toGfwProDashboardData(): GfwProDashboardData = { stats .map { case (group, data) => group. diff --git a/src/main/scala/org/globalforestwatch/util/GridRDD.scala b/src/main/scala/org/globalforestwatch/util/GridRDD.scala index 308a64e1..021da9ec 100644 --- a/src/main/scala/org/globalforestwatch/util/GridRDD.scala +++ b/src/main/scala/org/globalforestwatch/util/GridRDD.scala @@ -10,7 +10,7 @@ import org.globalforestwatch.util.GeometryConstructor.createPolygon1x1 object GridRDD { /** Returns an RDD with the set of 1x1 grid cells (polygons) that cover envelope. If * clip is true, then don't include any grid cells that are not covered by - * tcl_geom. + * tcl_geom (which is the extent of the TreeCoverLoss data). */ def apply(envelope: Envelope, spark: SparkSession, clip: Boolean = false): PolygonRDD = { val gridCells = getGridCells(envelope) diff --git a/src/main/scala/org/globalforestwatch/util/PartitionSkewedRDD.scala b/src/main/scala/org/globalforestwatch/util/PartitionSkewedRDD.scala index 933c404e..4c51fadb 100644 --- a/src/main/scala/org/globalforestwatch/util/PartitionSkewedRDD.scala +++ b/src/main/scala/org/globalforestwatch/util/PartitionSkewedRDD.scala @@ -5,6 +5,13 @@ import scala.reflect.ClassTag import org.apache.spark.rdd.RDD import org.apache.spark.HashPartitioner +// Hash partitioner, where we aim to group all rows with same id together and put +// them into the same partition based on hashing. However, if a single id has more +// than maxPartitionSize rows, we break it up into groups of at most maxPartitionSize +// and features in each group go to the same partition. The number of partitions is +// the max of rdd.sparkContext.defaultParallelism, or the number of id groups (which +// is the number of unique ids, plus extra groups because of ids that have more than +// maxPartitionSize rows). object RepartitionSkewedRDD { def bySparseId[A: ClassTag](rdd: RDD[(Long, A)], maxPartitionSize: Int): RDD[A] = { val counts = rdd.map{ case (id, _) => (id, 1l) }.reduceByKey(_ + _).collect().sortBy(_._2) @@ -45,4 +52,4 @@ object PartitionSplit { id -> PartitionSplit(index, splits) :: acc }.toMap } -} \ No newline at end of file +}