Skip to content

Commit

Permalink
Log intersection errors (#122)
Browse files Browse the repository at this point in the history
* Capture intersection errors in ForestChangeDiagnostic

* Supply alternate analytical pathway using validated geometries to log errors in the output; this runs, but we have already cleaned out bad geoms in the first instance using buffer(0)

Handle bad WKB inputs; must discard because some feature spark sql expressions can reference geometry

* cache the validated RDD so it can be joined

Co-authored-by: Eugene Cheipesh <[email protected]>
  • Loading branch information
jpolchlo and echeipesh authored Nov 1, 2021
1 parent c852a3f commit a20d972
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.globalforestwatch.features

import cats.data.NonEmptyList
import geotrellis.vector.Geometry
import com.vividsolutions.{jts => vs}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{col, isnull, struct, udf}
import org.globalforestwatch.util.GeometryFixer
import scala.util.Try

object SpatialFeatureDF {
def apply(input: NonEmptyList[String],
Expand Down Expand Up @@ -51,10 +56,57 @@ object SpatialFeatureDF {
// ST_PrecisionReduce may create invalid geometry if it contains a "sliver" that is below the precision threshold
// ST_Buffer(0) fixes these invalid geometries
featureDF
.selectExpr(
s"ST_Buffer(ST_PrecisionReduce(ST_GeomFromWKB(${wkbField}), 11), 0) AS polyshape",
s"struct(${featureObj.featureIdExpr}) as featureId"
)
.where(s"${wkbField} != '${emptyPolygonWKB}'")
}

/*
* Use GeoSpark to directly generate a DataFrame with a geometry column
* Any geometry that fails to be parsed as WKB will be dropped here
*/
def applyValidated(
input: NonEmptyList[String],
featureObj: Feature,
filters: FeatureFilter,
wkbField: String,
spark: SparkSession,
delimiter: String = "\t"
): DataFrame = {
import spark.implicits._

val featureDF: DataFrame = FeatureDF(input, featureObj, filters, spark, delimiter)
val emptyPolygonWKB = "0106000020E610000000000000"
val readOptionWkbUDF = udf{ s: String => readOption(s) }
featureDF
.where(s"${wkbField} != '${emptyPolygonWKB}'")
.selectExpr(
s"ST_Buffer(ST_PrecisionReduce(ST_GeomFromWKB(${wkbField}), 11), 0) AS polyshape",
s"${wkbField} AS wkb",
s"struct(${featureObj.featureIdExpr}) as featureId"
)
.where(s"${wkbField} != '${emptyPolygonWKB}'")
.select(
readOptionWkbUDF (col("wkb")).as("polyshape"),
col("featureId")
)
.where(!isnull('polyshape))
}

private val threadLocalWkbReader = new ThreadLocal[vs.io.WKBReader]

def readOption(wkbHexString: String): Option[vs.geom.Geometry] = {
if (threadLocalWkbReader.get() == null) {
val precisionModel = new vs.geom.PrecisionModel(1e11)
val factory = new vs.geom.GeometryFactory(precisionModel)
val wkbReader = new vs.io.WKBReader(factory)
threadLocalWkbReader.set(wkbReader)
}
val wkbReader = threadLocalWkbReader.get()

Try{
val binWkb = javax.xml.bind.DatatypeConverter.parseHexBinary(wkbHexString)
wkbReader.read(binWkb)
}.toOption
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package org.globalforestwatch.features

import cats.data.{NonEmptyList, Validated}
import com.vividsolutions.jts.geom.{
Envelope => GeoSparkEnvelope, Geometry => GeoSparkGeometry, Point => GeoSparkPoint, Polygon => GeoSparkPolygon,
MultiPolygon => GeoSparkMultiPolygon, Polygonal => GeoSparkPolygonal, GeometryCollection => GeoSparkGeometryCollection
}
import org.apache.log4j.Logger
import geotrellis.store.index.zcurve.Z2
import geotrellis.vector
import geotrellis.vector.{Geometry, MultiPolygon}
import org.apache.spark.HashPartitioner
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.datasyslab.geospark.spatialRDD.SpatialRDD
import org.datasyslab.geosparksql.utils.Adapter
import org.globalforestwatch.summarystats.{GeometryError, ValidatedRow}
import org.globalforestwatch.util.{GridRDD, SpatialJoinRDD}
import org.globalforestwatch.util.IntersectGeometry.{extractPolygons, validatedIntersection}
import org.globalforestwatch.util.Util.getAnyMapValue
import org.globalforestwatch.util.ImplicitGeometryConverter._

object ValidatedFeatureRDD {
val logger = Logger.getLogger("FeatureRDD")

/**
* Reads features from source and optionally splits them by 1x1 degree grid.
* - If the feature WKB is invalid, the feature will be dropped
* - If there is a problem with intersection logic, the erroring feature id will propagate to output
*/
def apply(
input: NonEmptyList[String],
featureType: String,
filters: FeatureFilter,
splitFeatures: Boolean,
spark: SparkSession
): RDD[(FeatureId, ValidatedRow[geotrellis.vector.Feature[Geometry, FeatureId]])] = {

if (splitFeatures) {
val featureObj: Feature = Feature(featureType)
val featureDF: DataFrame = SpatialFeatureDF.applyValidated(input, featureObj, filters, "geom", spark)
splitGeometries(featureType, featureDF, spark)
} else {
val featureObj: Feature = Feature(featureType)
val featuresDF: DataFrame = FeatureDF(input, featureObj, filters, spark)

featuresDF.rdd
.mapPartitions(
{ iter: Iterator[Row] =>
for {
i <- iter
if featureObj.isNonEmptyGeom(i)
} yield {
val feat = featureObj.get(i)
(feat.data, Validated.Valid(feat))
}
},
preservesPartitioning = true
)
}
}

private def splitGeometries(
featureType: String,
featureDF: DataFrame,
spark: SparkSession
): RDD[(FeatureId, ValidatedRow[geotrellis.vector.Feature[Geometry, FeatureId]])] = {

val spatialFeatureRDD: SpatialRDD[GeoSparkGeometry] = Adapter.toSpatialRdd(featureDF, "polyshape")
spatialFeatureRDD.analyze()

spatialFeatureRDD.rawSpatialRDD = spatialFeatureRDD.rawSpatialRDD.rdd.map { geom: GeoSparkGeometry =>
val featureId = FeatureId.fromUserData(featureType, geom.getUserData.asInstanceOf[String], delimiter = ",")
geom.setUserData(featureId)
geom
}

val envelope: GeoSparkEnvelope = spatialFeatureRDD.boundaryEnvelope
val spatialGridRDD = GridRDD(envelope, spark, clip = true)
val flatJoin: JavaPairRDD[GeoSparkPolygon, GeoSparkGeometry] =
SpatialJoinRDD.flatSpatialJoin(
spatialFeatureRDD,
spatialGridRDD,
considerBoundaryIntersection = true
)

/*
partitions will come back very skewed and we will need to even them out for any downstream analysis
For the summary analysis we will eventually use a range partitioner.
However, the range partitioner uses sampling to come up with the break points for the different partitions.
If the input RDD is already heavily skewed, sampling will be off and the range partitioner won't do a good job.
*/
val hashPartitioner = new HashPartitioner(flatJoin.getNumPartitions)

flatJoin.rdd
.keyBy({ pair: (GeoSparkPolygon, GeoSparkGeometry) =>
Z2(
(pair._1.getCentroid.getX * 100).toInt,
(pair._1.getCentroid.getY * 100).toInt
).z
})
.partitionBy(hashPartitioner)
.flatMap { case (_, (gridCell, geom)) =>
val (fid, geometries) = validatedIntersection(geom, gridCell)
geometries.traverse { geoms => geoms }.map { vg => (fid.asInstanceOf[FeatureId], vg) }
}
.flatMap {
case (_, Validated.Valid(mp)) if mp.isEmpty =>
// This is Valid result of intersection, but with no area == not an intersection
None // flatMap will drop the None,
case (fid, validated) =>
val validatedFeature = validated.map { intersection =>
val geotrellisGeom: MultiPolygon = intersection
geotrellis.vector.Feature(geotrellisGeom, fid)
}
Some(fid -> validatedFeature)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.globalforestwatch.features.FeatureId
import org.globalforestwatch.grids.GridSources
import scala.reflect.ClassTag
import cats.kernel.Semigroup
import cats.data.Validated.{Valid, Invalid}
import cats.data.Validated.{Valid, Invalid, valid, invalid}
import org.globalforestwatch.summarystats.forest_change_diagnostic.ForestChangeDiagnosticSummary


Expand All @@ -35,7 +35,7 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
windowLayout: LayoutDefinition,
kwargs: Map[String, Any],
partition: Boolean = true
)(implicit kt: ClassTag[SUMMARY], vt: ClassTag[FEATUREID], ord: Ordering[SUMMARY] = null): RDD[(FEATUREID, ValidatedRow[SUMMARY])] = {
)(implicit kt: ClassTag[SUMMARY], vt: ClassTag[FEATUREID]): RDD[(FEATUREID, ValidatedRow[SUMMARY])] = {

/* Intersect features with each tile from windowLayout grid and generate a record for each intersection.
* Each features will intersect one or more windows, possibly creating a duplicate record.
Expand Down Expand Up @@ -180,4 +180,4 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
object ErrorSummaryRDD {
val rasterizeOptions: Rasterizer.Options =
Rasterizer.Options(includePartial = false, sampleType = PixelIsPoint)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.globalforestwatch.summarystats.forest_change_diagnostic

import cats.data.NonEmptyList
import cats.data.Validated.{Invalid, Valid}
import cats.data.Validated.{Invalid, Valid, invalid, valid}
import cats.syntax._

import java.util
Expand All @@ -25,7 +25,7 @@
val name = "forest_change_diagnostic"

def apply(
mainRDD: RDD[Feature[Geometry, FeatureId]],
validatedRDD: RDD[(FeatureId, ValidatedRow[Feature[Geometry, FeatureId]])],
featureType: String,
intermediateListSource: Option[NonEmptyList[String]],
fireAlertRDD: SpatialRDD[GeoSparkGeometry],
Expand All @@ -34,24 +34,27 @@

val runOutputUrl: String = getOutputUrl(kwargs)

mainRDD.persist(StorageLevel.MEMORY_AND_DISK_2)
// These locations can't be processed because of an error in handling their geometry
val erroredLocationsRDD = validatedRDD.flatMap{ case (fid, feat) => feat.toEither.left.toOption.map { err => (fid, err) } }
val mainRDD = validatedRDD.flatMap{ case (_, feat) => feat.toEither.right.toOption }

validatedRDD.persist(StorageLevel.MEMORY_AND_DISK_2)

// For standard GFW Pro Feature IDs we create a Grid Filter
// This will allow us to only process those parts of the dissolved list geometry which were updated
// When using other Feature IDs such as WDPA or GADM,
// there will be no intermediate results and this part will be ignored
val gridFilter: List[String] =
mainRDD
.filter { feature: Feature[Geometry, FeatureId] =>
feature.data match {
case gfwproId: GfwProFeatureId => gfwproId.locationId == -2
case _ => false
mainRDD
.filter { feature: Feature[Geometry, FeatureId] =>
feature.data match {
case gfwproId: GfwProFeatureId => gfwproId.locationId == -2
case _ => false
}
}
}
.map(f => pointGridId(f.geom.getCentroid, 1))
.collect
.toList
.map(f => pointGridId(f.geom.getCentroid, 1))
.collect
.toList

val featureRDD: RDD[Feature[Geometry, FeatureId]] =
toFeatureRdd(mainRDD, gridFilter, intermediateListSource.isDefined)
Expand Down Expand Up @@ -95,7 +98,10 @@
kwargs)
else dataRDD

val summaryDF = ForestChangeDiagnosticDF.getFeatureDataFrame(finalRDD, spark)
val rejoinedRDD = finalRDD
.union(erroredLocationsRDD.map{ case (fid, err) => (fid, invalid[JobError, ForestChangeDiagnosticData](err)) })

val summaryDF = ForestChangeDiagnosticDF.getFeatureDataFrame(rejoinedRDD, spark)

ForestChangeDiagnosticExport.export(
featureType,
Expand Down Expand Up @@ -299,4 +305,4 @@
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ForestChangeDiagnosticCommand extends SummaryCommand with LazyLogging {
val featureFilter = FeatureFilter.fromOptions(default.featureType, filterOptions)

runAnalysis { implicit spark =>
val featureRDD = FeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true, spark)
val featureRDD = ValidatedFeatureRDD(default.featureUris, default.featureType, featureFilter, splitFeatures = true, spark)
val fireAlertRDD = FireAlertRDD(spark, fireAlert.alertType, fireAlert.alertSource, FeatureFilter.empty)
ForestChangeDiagnosticAnalysis(featureRDD, default.featureType, intermediateListSource, fireAlertRDD, kwargs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.globalforestwatch.util
import org.apache.log4j.Logger
import geotrellis.vector.{
Geometry,
GeomFactory,
LineString,
MultiPoint,
Point,
Expand All @@ -11,6 +12,7 @@ import geotrellis.vector.{
}
import geotrellis.vector.io.wkb.WKB
import org.globalforestwatch.util.GeotrellisGeometryReducer.{gpr, reduce}
import scala.util.Try

object GeotrellisGeometryValidator extends java.io.Serializable {
val logger = Logger.getLogger("Geotrellis Geometry Validator")
Expand Down Expand Up @@ -59,8 +61,8 @@ object GeotrellisGeometryValidator extends java.io.Serializable {
}

def makeValidGeom(wkb: String): Geometry = {
val geom: Geometry = WKB.read(wkb)
makeValidGeom(geom)
val geom: Option[Geometry] = Try(WKB.read(wkb)).toOption
geom.map(makeValidGeom(_)).getOrElse(GeomFactory.factory.createPoint())
}

def makeMultiGeom(geom: Geometry): Geometry = {
Expand Down
44 changes: 44 additions & 0 deletions src/main/scala/org/globalforestwatch/util/IntersectGeometry.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.globalforestwatch.util

import cats.data.Validated
import cats.data.Validated.{Valid, Invalid}
import com.vividsolutions.jts.geom.{
Geometry,
GeometryCollection,
MultiPolygon,
Polygon,
TopologyException,
}
import scala.util.{Try, Success, Failure}

import org.globalforestwatch.util.GeoSparkGeometryConstructor.createMultiPolygon
import org.globalforestwatch.summarystats.{GeometryError, ValidatedRow}

object IntersectGeometry {

Expand Down Expand Up @@ -39,6 +44,45 @@ object IntersectGeometry {
}
case _ => List()
}
}

def validatedIntersection(
thisGeom: Geometry,
thatGeom: Geometry
): (Object, ValidatedRow[List[MultiPolygon]]) = {

/**
* Intersection can return GeometryCollections
* Here we filter resulting geometries and only return those of the same type as thisGeom
* */
val userData = thisGeom.getUserData

val attempt = Try {
val intersection: Geometry = thisGeom intersection thatGeom
intersection match {
case poly: Polygon =>
val multi = createMultiPolygon(Array(poly))
multi.setUserData(userData)
List(multi)
case multi: MultiPolygon =>
multi.setUserData(userData)
List(multi)
case collection: GeometryCollection =>
val maybe_multi = extractPolygons(collection)
maybe_multi match {
case Some(multi) =>
multi.setUserData(userData)
List(multi)
case _ => List()
}
case _ => List()
}
}

(userData, attempt match {
case Success(v) => Valid(v)
case Failure(e) => Invalid(GeometryError(s"Failed intersection with ${thatGeom}"))
})

}

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "1.7.8"
version in ThisBuild := "1.7.9"

0 comments on commit a20d972

Please sign in to comment.