diff --git a/src/main/scala/org/globalforestwatch/features/FeatureRDD.scala b/src/main/scala/org/globalforestwatch/features/FeatureRDD.scala index e9d50082..ec542f93 100644 --- a/src/main/scala/org/globalforestwatch/features/FeatureRDD.scala +++ b/src/main/scala/org/globalforestwatch/features/FeatureRDD.scala @@ -145,12 +145,11 @@ object FeatureRDD { Adapter.toSpatialRdd(featureDF, "polyshape") spatialRDD.analyze() - spatialRDD.rawSpatialRDD = spatialRDD.rawSpatialRDD.rdd.map { geom: Geometry => val featureId = FeatureId.fromUserData(featureType, geom.getUserData.asInstanceOf[String], delimiter = ",") geom.setUserData(featureId) geom - } + }.repartition(numPartitions = spatialRDD.rawSpatialRDD.getNumPartitions * 10) splitGeometries(spatialRDD, spark) } diff --git a/src/main/scala/org/globalforestwatch/summarystats/SummaryRDD.scala b/src/main/scala/org/globalforestwatch/summarystats/SummaryRDD.scala index baf11396..7cbbac6b 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/SummaryRDD.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/SummaryRDD.scala @@ -8,6 +8,7 @@ import geotrellis.layer.{LayoutDefinition, SpatialKey} import geotrellis.raster.summary.polygonal.{NoIntersection, PolygonalSummaryResult} import geotrellis.raster.summary.polygonal import geotrellis.store.index.zcurve.Z2 +import geotrellis.vector.Extent.toPolygon import geotrellis.vector._ import org.apache.spark.RangePartitioner import org.apache.spark.rdd.RDD @@ -94,8 +95,8 @@ trait SummaryRDD extends LazyLogging with java.io.Serializable { } val groupedByKey - : Map[SpatialKey, - Array[(SpatialKey, Feature[Geometry, FEATUREID])]] = + : Map[SpatialKey, + Array[(SpatialKey, Feature[Geometry, FEATUREID])]] = windowFeature.toArray.groupBy { case (windowKey, feature) => windowKey } @@ -112,89 +113,64 @@ trait SummaryRDD extends LazyLogging with java.io.Serializable { readWindow(rs, windowKey, windowLayout) } - // Intersect with window extent and group by resulting geom, so we only do polygonal summary - // once per each geometry tile. This will speed up overlapping geometries a lot, where often we're - // analyzing the full window extent across many features. - val groupedByWindowGeom: Map[Geometry, Array[Feature[Geometry, FEATUREID]]] = features.groupBy { - case feature: Feature[Geometry, FEATUREID] => - val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey) - - try { - if (feature.geom.isValid) { - // if valid geometry, attempt to intersect - feature.geom.intersection(windowGeom) - } else { - // otherwise, just use original geometry, since sometimes - // making complex geometries valid can be very slow - feature.geom - } - } catch { - case e: org.locationtech.jts.geom.TopologyException => - // fallback to original geometry if there are any intersection issues - feature.geom - } - } - - groupedByWindowGeom.flatMap { - case (windowGeom: Geometry, features: Array[Feature[Geometry, FEATUREID]]) => - val rasterizeOptions = Rasterizer.Options( - includePartial = false, - sampleType = PixelIsPoint - ) - - maybeRaster match { - case Left(exception) => - logger.error(s"Raster could not bread at spatial key:\n$windowKey\n$exception") - List.empty - - case Right(raster) => - val summary: Option[SUMMARY] = - try { - runPolygonalSummary( - raster, - windowGeom, - rasterizeOptions, - kwargs - ) match { - case polygonal.Summary(result: SUMMARY) => Some(result) - case NoIntersection => None - } - } catch { - case ise: java.lang.IllegalStateException => { - println( - s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}" - ) - // TODO some very invalid geoms are somehow getting here, skip for now - None - } - case te: org.locationtech.jts.geom.TopologyException => { - println( - s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}" - ) - None - } - case be: java.lang.ArrayIndexOutOfBoundsException => { - println( - s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}" - ) - None - } - case ise: java.lang.IllegalArgumentException => { - println( - s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}" - ) - None - } - + // flatMap here flattens out and ignores the errors + features.flatMap { feature: Feature[Geometry, FEATUREID] => + val id: FEATUREID = feature.data + val rasterizeOptions = Rasterizer.Options( + includePartial = false, + sampleType = PixelIsPoint + ) + + maybeRaster match { + case Left(exception) => + logger.error(s"Feature $id: $exception") + List.empty + + case Right(raster) => + val summary: Option[SUMMARY] = + try { + runPolygonalSummary( + raster, + feature.geom, + rasterizeOptions, + kwargs + ) match { + case polygonal.Summary(result: SUMMARY) => Some(result) + case NoIntersection => None + } + } catch { + case ise: java.lang.IllegalStateException => { + println( + s"There is an issue with geometry for ${feature.data}" + ) + // TODO some very invalid geoms are somehow getting here, skip for now + None + } + case te: org.locationtech.jts.geom.TopologyException => { + println( + s"There is an issue with geometry for ${feature.data}: ${feature.geom}" + ) + None + } + case be: java.lang.ArrayIndexOutOfBoundsException => { + println( + s"There is an issue with geometry for ${feature.data}: ${feature.geom}" + ) + None + } + case ise: java.lang.IllegalArgumentException => { + println( + s"There is an issue with geometry for ${feature.data}: ${feature.geom}" + ) + None } - summary match { - case Some(result) => - features.map { - case feature: Feature[Geometry, FEATUREID] => (feature.data, result) - }.toList - case None => List.empty } + + summary match { + case Some(result) => List((id, result)) + case None => List.empty + } } } } diff --git a/src/main/scala/org/globalforestwatch/summarystats/firealerts/FireAlertsExport.scala b/src/main/scala/org/globalforestwatch/summarystats/firealerts/FireAlertsExport.scala index 08bad28c..b4674d4f 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/firealerts/FireAlertsExport.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/firealerts/FireAlertsExport.scala @@ -131,18 +131,18 @@ object FireAlertsExport extends SummaryExport { import spark.implicits._ val groupByCols = List( - "wdpa_protected_areas__id", - "wdpa_protected_areas__name", - "wdpa_protected_areas__iucn_cat", - "wdpa_protected_areas__iso", - "wdpa_protected_areas__status", + "wdpa_protected_area__id", + "wdpa_protected_area__name", + "wdpa_protected_area__iucn_cat", + "wdpa_protected_area__iso", + "wdpa_protected_area__status", ) val unpackCols = List( - $"featureId.wdpaId" as "wdpa_protected_areas__id", - $"featureId.name" as "wdpa_protected_areas__name", - $"featureId.iucnCat" as "wdpa_protected_areas__iucn_cat", - $"featureId.iso" as "wdpa_protected_areas__iso", - $"featureId.status" as "wdpa_protected_areas__status" + $"featureId.wdpaId" as "wdpa_protected_area__id", + $"featureId.name" as "wdpa_protected_area__name", + $"featureId.iucnCat" as "wdpa_protected_area__iucn_cat", + $"featureId.iso" as "wdpa_protected_area__iso", + $"featureId.status" as "wdpa_protected_area__status" ) _export(summaryDF, outputUrl + "/wdpa", kwargs, groupByCols, unpackCols, wdpa = true) diff --git a/src/main/scala/org/globalforestwatch/summarystats/gladalerts/GladAlertsExport.scala b/src/main/scala/org/globalforestwatch/summarystats/gladalerts/GladAlertsExport.scala index 648eaaa6..e4c2f43b 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/gladalerts/GladAlertsExport.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/gladalerts/GladAlertsExport.scala @@ -159,18 +159,18 @@ object GladAlertsExport extends SummaryExport { import spark.implicits._ val groupByCols = List( - "wdpa_protected_areas__id", - "wdpa_protected_areas__name", - "wdpa_protected_areas__iucn_cat", - "wdpa_protected_areas__iso", - "wdpa_protected_areas__status" + "wdpa_protected_area__id", + "wdpa_protected_area__name", + "wdpa_protected_area__iucn_cat", + "wdpa_protected_area__iso", + "wdpa_protected_area__status" ) val unpackCols = List( - $"id.wdpaId" as "wdpa_protected_areas__id", - $"id.name" as "wdpa_protected_areas__name", - $"id.iucnCat" as "wdpa_protected_areas__iucn_cat", - $"id.iso" as "wdpa_protected_areas__iso", - $"id.status" as "wdpa_protected_areas__status" + $"id.wdpaId" as "wdpa_protected_area__id", + $"id.name" as "wdpa_protected_area__name", + $"id.iucnCat" as "wdpa_protected_area__iucn_cat", + $"id.iso" as "wdpa_protected_area__iso", + $"id.status" as "wdpa_protected_area__status" ) _export(summaryDF, outputUrl + "/wdpa", kwargs, groupByCols, unpackCols, wdpa = true) diff --git a/src/main/scala/org/globalforestwatch/summarystats/integrated_alerts/IntegratedAlertsExport.scala b/src/main/scala/org/globalforestwatch/summarystats/integrated_alerts/IntegratedAlertsExport.scala index be62f8b6..2bbb7d78 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/integrated_alerts/IntegratedAlertsExport.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/integrated_alerts/IntegratedAlertsExport.scala @@ -124,18 +124,18 @@ object IntegratedAlertsExport extends SummaryExport { import spark.implicits._ val groupByCols = List( - "wdpa_protected_areas__id", - "wdpa_protected_areas__name", - "wdpa_protected_areas__iucn_cat", - "wdpa_protected_areas__iso", - "wdpa_protected_areas__status" + "wdpa_protected_area__id", + "wdpa_protected_area__name", + "wdpa_protected_area__iucn_cat", + "wdpa_protected_area__iso", + "wdpa_protected_area__status" ) val unpackCols = List( - $"id.wdpaId" as "wdpa_protected_areas__id", - $"id.name" as "wdpa_protected_areas__name", - $"id.iucnCat" as "wdpa_protected_areas__iucn_cat", - $"id.iso" as "wdpa_protected_areas__iso", - $"id.status" as "wdpa_protected_areas__status" + $"id.wdpaId" as "wdpa_protected_area__id", + $"id.name" as "wdpa_protected_area__name", + $"id.iucnCat" as "wdpa_protected_area__iucn_cat", + $"id.iso" as "wdpa_protected_area__iso", + $"id.status" as "wdpa_protected_area__status" ) _export(summaryDF, outputUrl + "/wdpa", kwargs, groupByCols, unpackCols, wdpa = true, numExportParts = 50) @@ -177,8 +177,6 @@ object IntegratedAlertsExport extends SummaryExport { wdpa: Boolean = false, numExportParts: Int = 10): Unit = { - val changeOnly: Boolean = getAnyMapValue[Boolean](kwargs, "changeOnly") - val cols = groupByCols val df = summaryDF.transform( diff --git a/src/main/scala/org/globalforestwatch/util/SpatialJoinRDD.scala b/src/main/scala/org/globalforestwatch/util/SpatialJoinRDD.scala index e28df7e1..d81a2b29 100644 --- a/src/main/scala/org/globalforestwatch/util/SpatialJoinRDD.scala +++ b/src/main/scala/org/globalforestwatch/util/SpatialJoinRDD.scala @@ -81,7 +81,7 @@ object SpatialJoinRDD { try { queryWindowRDD.spatialPartitioning( - GridType.QUADTREE, + GridType.KDBTREE, Seq(queryWindowPartitions, (queryWindowCount / 2).toInt).min ) valueRDD.spatialPartitioning(queryWindowRDD.getPartitioner)