Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
pspoerri committed Aug 23, 2023
1 parent 9f73081 commit ba0179e
Showing 1 changed file with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
package org.apache.spark.storage

import org.apache.hadoop.io.ElasticByteBufferPool
import org.apache.hadoop.util.Preconditions
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.storage.S3DoubleBufferedStream.{getBuffer, releaseBuffer}

import java.io.{EOFException, InputStream}
import java.nio.ByteBuffer
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}

class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) extends InputStream {
class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) extends InputStream with Logging {
private var buffers: Array[ByteBuffer] = {
val array = new Array[ByteBuffer](2)
array(0) = getBuffer(bufferSize)
Expand All @@ -25,6 +27,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
})
array
}
private var prefetching = Array.fill(2)(false)

var streamClosed = false
var pos: Long = 0
Expand All @@ -44,6 +47,10 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
buffers((bufIdx + 1) % buffers.length)
}

private def swap() = synchronized {
bufIdx = (bufIdx + 1) % buffers.length
}

private def eof: Boolean = synchronized {
if (buffers == null) {
throw new EOFException("Stream already closed")
Expand All @@ -53,9 +60,9 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte

private def prepareRead(): Unit = synchronized {
if (!currentBuffer.hasRemaining && dataAvailable) {
// Swap buffers.
bufIdx = (bufIdx + 1) % buffers.length
swap()
dataAvailable = false
Preconditions.checkState(nextBuffer.remaining() == 0)
doPrefetch(nextBuffer)
}
}
Expand All @@ -65,13 +72,19 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
// no data available
return
}
logInfo("Do Prefetch")
val fut = Future[Int] {
buffer.clear()
val len = writeTo(buffer, stream)
if (len < 0) {
throw new EOFException()
}
var len: Int = 0
do {
len = writeTo(buffer, stream)
if (len < 0) {
throw new EOFException()
}
} while (len == 0)
buffer.flip()
Preconditions.checkState(buffer.limit() == len)
Preconditions.checkState(buffer.position() == 0)
len
}(S3ShuffleReader.asyncExecutionContext)
fut.onComplete(onCompletePrefetch)(S3ShuffleReader.asyncExecutionContext)
Expand All @@ -82,7 +95,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
case Failure(exception) => error = Some(exception)
case Success(len) =>
dataAvailable = true
assert(nextBuffer.remaining == len)
Preconditions.checkState(nextBuffer.remaining == len)
}
notifyAll()
}
Expand Down Expand Up @@ -130,7 +143,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
prepareRead()
if (currentBuffer.remaining > 0) {
val l = readFrom(currentBuffer, b, off, len)
if (len < 0) {
if (l < 0) {
throw new SparkException("Invalid state in shuffle read(buf).")
}
pos += l
Expand Down

0 comments on commit ba0179e

Please sign in to comment.