Skip to content

Commit

Permalink
Merge pull request #160 from wri/develop
Browse files Browse the repository at this point in the history
Develop to master
  • Loading branch information
jterry64 authored May 27, 2022
2 parents 4f4dc1f + 275c75a commit 2a3e864
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,11 @@ object FeatureRDD {
Adapter.toSpatialRdd(featureDF, "polyshape")
spatialRDD.analyze()


spatialRDD.rawSpatialRDD = spatialRDD.rawSpatialRDD.rdd.map { geom: Geometry =>
val featureId = FeatureId.fromUserData(featureType, geom.getUserData.asInstanceOf[String], delimiter = ",")
geom.setUserData(featureId)
geom
}
}.repartition(numPartitions = spatialRDD.rawSpatialRDD.getNumPartitions * 10)

splitGeometries(spatialRDD, spark)
}
Expand Down
140 changes: 58 additions & 82 deletions src/main/scala/org/globalforestwatch/summarystats/SummaryRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import geotrellis.layer.{LayoutDefinition, SpatialKey}
import geotrellis.raster.summary.polygonal.{NoIntersection, PolygonalSummaryResult}
import geotrellis.raster.summary.polygonal
import geotrellis.store.index.zcurve.Z2
import geotrellis.vector.Extent.toPolygon
import geotrellis.vector._
import org.apache.spark.RangePartitioner
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -94,8 +95,8 @@ trait SummaryRDD extends LazyLogging with java.io.Serializable {
}

val groupedByKey
: Map[SpatialKey,
Array[(SpatialKey, Feature[Geometry, FEATUREID])]] =
: Map[SpatialKey,
Array[(SpatialKey, Feature[Geometry, FEATUREID])]] =
windowFeature.toArray.groupBy {
case (windowKey, feature) => windowKey
}
Expand All @@ -112,89 +113,64 @@ trait SummaryRDD extends LazyLogging with java.io.Serializable {
readWindow(rs, windowKey, windowLayout)
}

// Intersect with window extent and group by resulting geom, so we only do polygonal summary
// once per each geometry tile. This will speed up overlapping geometries a lot, where often we're
// analyzing the full window extent across many features.
val groupedByWindowGeom: Map[Geometry, Array[Feature[Geometry, FEATUREID]]] = features.groupBy {
case feature: Feature[Geometry, FEATUREID] =>
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey)

try {
if (feature.geom.isValid) {
// if valid geometry, attempt to intersect
feature.geom.intersection(windowGeom)
} else {
// otherwise, just use original geometry, since sometimes
// making complex geometries valid can be very slow
feature.geom
}
} catch {
case e: org.locationtech.jts.geom.TopologyException =>
// fallback to original geometry if there are any intersection issues
feature.geom
}
}

groupedByWindowGeom.flatMap {
case (windowGeom: Geometry, features: Array[Feature[Geometry, FEATUREID]]) =>
val rasterizeOptions = Rasterizer.Options(
includePartial = false,
sampleType = PixelIsPoint
)

maybeRaster match {
case Left(exception) =>
logger.error(s"Raster could not bread at spatial key:\n$windowKey\n$exception")
List.empty

case Right(raster) =>
val summary: Option[SUMMARY] =
try {
runPolygonalSummary(
raster,
windowGeom,
rasterizeOptions,
kwargs
) match {
case polygonal.Summary(result: SUMMARY) => Some(result)
case NoIntersection => None
}
} catch {
case ise: java.lang.IllegalStateException => {
println(
s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}"
)
// TODO some very invalid geoms are somehow getting here, skip for now
None
}
case te: org.locationtech.jts.geom.TopologyException => {
println(
s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}"
)
None
}
case be: java.lang.ArrayIndexOutOfBoundsException => {
println(
s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}"
)
None
}
case ise: java.lang.IllegalArgumentException => {
println(
s"There is an issue with running polygonal summary on geometry:\n ${windowGeom}"
)
None
}

// flatMap here flattens out and ignores the errors
features.flatMap { feature: Feature[Geometry, FEATUREID] =>
val id: FEATUREID = feature.data
val rasterizeOptions = Rasterizer.Options(
includePartial = false,
sampleType = PixelIsPoint
)

maybeRaster match {
case Left(exception) =>
logger.error(s"Feature $id: $exception")
List.empty

case Right(raster) =>
val summary: Option[SUMMARY] =
try {
runPolygonalSummary(
raster,
feature.geom,
rasterizeOptions,
kwargs
) match {
case polygonal.Summary(result: SUMMARY) => Some(result)
case NoIntersection => None
}
} catch {
case ise: java.lang.IllegalStateException => {
println(
s"There is an issue with geometry for ${feature.data}"
)
// TODO some very invalid geoms are somehow getting here, skip for now
None
}
case te: org.locationtech.jts.geom.TopologyException => {
println(
s"There is an issue with geometry for ${feature.data}: ${feature.geom}"
)
None
}
case be: java.lang.ArrayIndexOutOfBoundsException => {
println(
s"There is an issue with geometry for ${feature.data}: ${feature.geom}"
)
None
}
case ise: java.lang.IllegalArgumentException => {
println(
s"There is an issue with geometry for ${feature.data}: ${feature.geom}"
)
None
}

summary match {
case Some(result) =>
features.map {
case feature: Feature[Geometry, FEATUREID] => (feature.data, result)
}.toList
case None => List.empty
}

summary match {
case Some(result) => List((id, result))
case None => List.empty
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,18 @@ object FireAlertsExport extends SummaryExport {
import spark.implicits._

val groupByCols = List(
"wdpa_protected_areas__id",
"wdpa_protected_areas__name",
"wdpa_protected_areas__iucn_cat",
"wdpa_protected_areas__iso",
"wdpa_protected_areas__status",
"wdpa_protected_area__id",
"wdpa_protected_area__name",
"wdpa_protected_area__iucn_cat",
"wdpa_protected_area__iso",
"wdpa_protected_area__status",
)
val unpackCols = List(
$"featureId.wdpaId" as "wdpa_protected_areas__id",
$"featureId.name" as "wdpa_protected_areas__name",
$"featureId.iucnCat" as "wdpa_protected_areas__iucn_cat",
$"featureId.iso" as "wdpa_protected_areas__iso",
$"featureId.status" as "wdpa_protected_areas__status"
$"featureId.wdpaId" as "wdpa_protected_area__id",
$"featureId.name" as "wdpa_protected_area__name",
$"featureId.iucnCat" as "wdpa_protected_area__iucn_cat",
$"featureId.iso" as "wdpa_protected_area__iso",
$"featureId.status" as "wdpa_protected_area__status"
)

_export(summaryDF, outputUrl + "/wdpa", kwargs, groupByCols, unpackCols, wdpa = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,18 @@ object GladAlertsExport extends SummaryExport {
import spark.implicits._

val groupByCols = List(
"wdpa_protected_areas__id",
"wdpa_protected_areas__name",
"wdpa_protected_areas__iucn_cat",
"wdpa_protected_areas__iso",
"wdpa_protected_areas__status"
"wdpa_protected_area__id",
"wdpa_protected_area__name",
"wdpa_protected_area__iucn_cat",
"wdpa_protected_area__iso",
"wdpa_protected_area__status"
)
val unpackCols = List(
$"id.wdpaId" as "wdpa_protected_areas__id",
$"id.name" as "wdpa_protected_areas__name",
$"id.iucnCat" as "wdpa_protected_areas__iucn_cat",
$"id.iso" as "wdpa_protected_areas__iso",
$"id.status" as "wdpa_protected_areas__status"
$"id.wdpaId" as "wdpa_protected_area__id",
$"id.name" as "wdpa_protected_area__name",
$"id.iucnCat" as "wdpa_protected_area__iucn_cat",
$"id.iso" as "wdpa_protected_area__iso",
$"id.status" as "wdpa_protected_area__status"
)

_export(summaryDF, outputUrl + "/wdpa", kwargs, groupByCols, unpackCols, wdpa = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ object IntegratedAlertsExport extends SummaryExport {
import spark.implicits._

val groupByCols = List(
"wdpa_protected_areas__id",
"wdpa_protected_areas__name",
"wdpa_protected_areas__iucn_cat",
"wdpa_protected_areas__iso",
"wdpa_protected_areas__status"
"wdpa_protected_area__id",
"wdpa_protected_area__name",
"wdpa_protected_area__iucn_cat",
"wdpa_protected_area__iso",
"wdpa_protected_area__status"
)
val unpackCols = List(
$"id.wdpaId" as "wdpa_protected_areas__id",
$"id.name" as "wdpa_protected_areas__name",
$"id.iucnCat" as "wdpa_protected_areas__iucn_cat",
$"id.iso" as "wdpa_protected_areas__iso",
$"id.status" as "wdpa_protected_areas__status"
$"id.wdpaId" as "wdpa_protected_area__id",
$"id.name" as "wdpa_protected_area__name",
$"id.iucnCat" as "wdpa_protected_area__iucn_cat",
$"id.iso" as "wdpa_protected_area__iso",
$"id.status" as "wdpa_protected_area__status"
)

_export(summaryDF, outputUrl + "/wdpa", kwargs, groupByCols, unpackCols, wdpa = true, numExportParts = 50)
Expand Down Expand Up @@ -177,8 +177,6 @@ object IntegratedAlertsExport extends SummaryExport {
wdpa: Boolean = false,
numExportParts: Int = 10): Unit = {

val changeOnly: Boolean = getAnyMapValue[Boolean](kwargs, "changeOnly")

val cols = groupByCols

val df = summaryDF.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object SpatialJoinRDD {

try {
queryWindowRDD.spatialPartitioning(
GridType.QUADTREE,
GridType.KDBTREE,
Seq(queryWindowPartitions, (queryWindowCount / 2).toInt).min
)
valueRDD.spatialPartitioning(queryWindowRDD.getPartitioner)
Expand Down

0 comments on commit 2a3e864

Please sign in to comment.