Skip to content

Commit

Permalink
Split SpillFramework.remove
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Bellina <[email protected]>
  • Loading branch information
abellina committed Dec 11, 2024
1 parent 5aacedb commit 88c18b7
Showing 1 changed file with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ trait DeviceSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
}

protected def releaseDeviceResource(): Unit = {
SpillFramework.remove(this)
SpillFramework.removeFromDeviceStore(this)
synchronized {
dev.foreach(_.close())
dev = None
Expand Down Expand Up @@ -233,7 +233,7 @@ trait HostSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
}

protected def releaseHostResource(): Unit = {
SpillFramework.remove(this)
SpillFramework.removeFromHostStore(this)
synchronized {
host.foreach(_.close())
host = None
Expand Down Expand Up @@ -904,7 +904,7 @@ class DiskHandle private(
}

override def close(): Unit = {
SpillFramework.remove(this)
SpillFramework.removeFromDiskStore(this)
SpillFramework.stores.diskStore.deleteFile(blockId)
}

Expand Down Expand Up @@ -1572,26 +1572,25 @@ object SpillFramework extends Logging {

var chunkedPackBounceBufferPool: DeviceBounceBufferPool = _

private[spill] def remove(handle: StoreHandle): Unit = {
// if the stores have already shut down, we don't want to create them here
// so we use `storesInternal` directly.
handle match {
case ds: DeviceSpillableHandle[_] =>
synchronized {
Option(storesInternal).map(_.deviceStore)
}.foreach(_.remove(ds))
case hs: HostSpillableHandle[_] =>
synchronized {
Option(storesInternal).map(_.hostStore)
}.foreach(_.remove(hs))
case dh: DiskHandle =>
synchronized {
Option(storesInternal).map(_.diskStore)
}.foreach(_.remove(dh))
case _ =>
throw new IllegalStateException(
s"unknown handle ${handle} cannot be removed")
}
// if the stores have already shut down, we don't want to create them here
// so we use `storesInternal` directly in these remove functions.

private[spill] def removeFromDeviceStore(handle: DeviceSpillableHandle[_]): Unit = {
synchronized {
Option(storesInternal).map(_.deviceStore)
}.foreach(_.remove(handle))
}

private[spill] def removeFromHostStore(handle: HostSpillableHandle[_]): Unit = {
synchronized {
Option(storesInternal).map(_.hostStore)
}.foreach(_.remove(handle))
}

private[spill] def removeFromDiskStore(handle: DiskHandle): Unit = {
synchronized {
Option(storesInternal).map(_.diskStore)
}.foreach(_.remove(handle))
}
}

Expand Down

0 comments on commit 88c18b7

Please sign in to comment.