Skip to content

Commit

Permalink
Test code to do a few retries of layer opens/reads
Browse files Browse the repository at this point in the history
Test code to deal with cases where S3 seems do repeatedly drop
connections and cause GDAL layer accesses in checkRequired/checkOptional
to fail after 5 curl acccesses. I'm adding retries (with 5 second
sleeps) in the checkRequired/checkOptional functions for now, and also
in the RequiredILayer/OptionalILayer.fetchWindow calls, just to see if
that helps in getting past the failed S3 operations.
  • Loading branch information
danscales committed Dec 10, 2023
1 parent 054a820 commit 55ccda6
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 21 deletions.
44 changes: 36 additions & 8 deletions src/main/scala/org/globalforestwatch/grids/Grid.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,47 @@ trait Grid[T <: GridSources] {

// NOTE: This check will cause an eager fetch of raster metadata
def checkRequired(layer: RequiredLayer, windowExtent: Extent): Unit = {
require(
layer.source.extent.intersects(windowExtent),
s"${layer.uri} does not intersect: $windowExtent"
)
var retries = 3
var doesIntersect = false
while (retries > 0) {
try {
doesIntersect = layer.source.extent.intersects(windowExtent)
retries = 0
} catch {
case scala.util.control.NonFatal(t) => {
retries -= 1
if (retries == 0) {
throw t
}
println("Retrying checkRequired ${retries}: ${t.getMessage()}")
Thread.sleep(5000)
}
}
}
require(doesIntersect, s"${layer.uri} does not intersect: $windowExtent")
}

// Only check these guys if they're defined
def checkOptional(layer: OptionalLayer, windowExtent: Extent): Unit = {
layer.source.foreach { source =>
require(
source.extent.intersects(windowExtent),
s"${layer.uri} does not intersect: $windowExtent"
)
var retries = 3
var doesIntersect = false
while (retries > 0) {
try {
doesIntersect = source.extent.intersects(windowExtent)
retries = 0
} catch {
case scala.util.control.NonFatal(t) => {
retries -= 1
if (retries == 0) {
throw t
}
println("Retrying checkOptional ${retries}: ${t.getMessage()}")
Thread.sleep(5000)
}
}
}
require(doesIntersect, s"${layer.uri} does not intersect: $windowExtent")
}
}

Expand Down
52 changes: 39 additions & 13 deletions src/main/scala/org/globalforestwatch/layers/Layer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,25 @@ trait RequiredILayer extends RequiredLayer with ILayer {
def fetchWindow(windowKey: SpatialKey,
windowLayout: LayoutDefinition): ITile = {
val layoutSource = LayoutTileSource.spatial(source, windowLayout)
var retries = 3
var tile: Tile = null
while (retries > 0) {
try {
// println(s"Fetching required int tile ${source.dataPath.value}, key ${windowKey}")
val tile = source.synchronized {
layoutSource.read(windowKey).get.band(0)
tile = source.synchronized {
layoutSource.read(windowKey).get.band(0)
}
retries = 0
} catch {
case scala.util.control.NonFatal(t) => {
retries -= 1
if (retries == 0) {
throw t
}
println("Retrying RequiredILayer fetchWindow ${retries}: ${t.getMessage()}")
Thread.sleep(5000)
}
}
}
new ITile(tile)
}
Expand Down Expand Up @@ -329,18 +345,28 @@ trait OptionalILayer extends OptionalLayer with ILayer {
def fetchWindow(windowKey: SpatialKey,
windowLayout: LayoutDefinition): OptionalITile = {
// source.foreach(s => println(s"Fetching optional int tile ${s.dataPath.value}, key ${windowKey}"))
new OptionalITile(for {
source <- source
raster <- Either
.catchNonFatal(source.synchronized {
LayoutTileSource
.spatial(source, windowLayout)
.read(windowKey)
.get
.band(0)
if (source.isEmpty) {
return OptionalITile(None)
}
val layoutSource = LayoutTileSource.spatial(source.get, windowLayout)
var retries = 3
var tile: Option[Tile] = None
while (retries > 0) {
try {
// println(s"Fetching required int tile ${source.dataPath.value}, key ${windowKey}")
tile = Some(source.synchronized {
layoutSource.read(windowKey).get.band(0)
})
.toOption
} yield raster)
retries = 0
} catch {
case scala.util.control.NonFatal(t) => {
retries -= 1
println("Retrying OptionalILayer fetchWindow ${retries}: ${t.getMessage()}")
Thread.sleep(5000)
}
}
}
new OptionalITile(tile)
}
}

Expand Down

0 comments on commit 55ccda6

Please sign in to comment.