From 221b1c9c2bde0c24ecd3c583b0fdff91f3e54670 Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Sat, 30 Mar 2024 20:22:03 +0100 Subject: [PATCH] load_stac for netcdf: improved partitioning https://github.com/Open-EO/openeo-geotrellis-extensions/issues/270 --- .../openeo/geotrellis/layers/NetCDFCollection.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/NetCDFCollection.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/NetCDFCollection.scala index b9ebce30..eb3198a0 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/NetCDFCollection.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/layers/NetCDFCollection.scala @@ -7,11 +7,12 @@ import geotrellis.raster.{CellSize, MultibandTile, Raster, RasterExtent, Tile, T import geotrellis.raster.gdal.{DefaultDomain, GDALException, GDALRasterSource, MalformedProjectionException} import geotrellis.spark.{ContextRDD, MultibandTileLayerRDD, withTilerMethods} import geotrellis.spark._ +import geotrellis.spark.partition.SpacePartitioner import geotrellis.vector.{Extent, ProjectedExtent} -import org.apache.spark.SparkContext +import org.apache.spark.{Partitioner, SparkContext} import org.apache.spark.rdd.RDD import org.openeo.geotrellis.ProjectedPolygons -import org.openeo.geotrelliscommon.DataCubeParameters +import org.openeo.geotrelliscommon.{ByTileSpacetimePartitioner, ByTileSpatialPartitioner, DataCubeParameters} import org.openeo.opensearch.OpenSearchClient import java.time.{LocalDate, ZoneId, ZonedDateTime} @@ -45,7 +46,7 @@ object NetCDFCollection { val bboxWGS84: Extent = items.map(_.bbox).reduce((a, b)=>(a.combine(b))) - val features: RDD[(TemporalProjectedExtent, MultibandTile)] = items.flatMap(f=>{ + val features: RDD[(TemporalProjectedExtent, MultibandTile)] = items.repartition(stacItems.length).flatMap(f=>{ val allTiles = f.links.flatMap(l=>{ l.bandNames.get.flatMap(b=> { var gdalNetCDFLink = s"${l.href.toString.replace("file:", "NETCDF:")}:${b}" @@ -106,8 +107,10 @@ object NetCDFCollection { val spatialBounds = KeyBounds(layout.mapTransform(extent)) val temporalBounds = KeyBounds(SpaceTimeKey(spatialBounds.minKey,TemporalKey(LocalDate.of(1990,1,1).atStartOfDay(ZoneId.of("UTC")))),SpaceTimeKey(spatialBounds.maxKey,TemporalKey(LocalDate.now().atStartOfDay(ZoneId.of("UTC"))))) + val partitioner: Partitioner = new SpacePartitioner(temporalBounds)(implicitly, implicitly, ByTileSpacetimePartitioner) + val metadata = TileLayerMetadata[SpaceTimeKey](cellType, layout, extent, crs(0), temporalBounds) - val retiled: RDD[(SpaceTimeKey, MultibandTile)] = features.tileToLayout(metadata) + val retiled: RDD[(SpaceTimeKey, MultibandTile)] = features.tileToLayout(metadata).partitionBy(partitioner) ContextRDD(retiled,metadata)