Skip to content

Commit

Permalink
Fix buffer size and print statistics.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Aug 28, 2023
1 parent 2077860 commit 983ba4d
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 33 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,23 @@ These configuration values need to be passed to Spark to load and configure the

Changing these values might have an impact on performance.

- `spark.shuffle.s3.bufferSize`: Default size of the buffered output streams (default: `32768`,
uses `spark.shuffle.file.buffer` as default)
- `spark.shuffle.s3.bufferSize`: The maximum buffer size (default: `4194304`). Max memory consumption: `4x NUM_TASKS`.
- `spark.shuffle.s3.cachePartitionLengths`: Cache partition lengths in memory (default: `true`)
- `spark.shuffle.s3.cacheChecksums`: Cache checksums in memory (default: `true`)
- `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`)
- `spark.shuffle.s3.folderPrefixes`: The number of prefixes to use when storing files on S3
(default: `10`, minimum: `1`).

**Note**: This option can be used to optimize performance on object stores which have a prefix rate-limit.
- `spark.shuffle.s3.prefetchBatchSize`: Prefetch batch size (default: `25`). Controls how many partitions are prefetched
concurrently per task.
- `spark.shuffle.s3.prefetchThreadPoolSize`: Prefetch thread pool size (default: `100`). The total size of the thread
pool used for prefetching the shuffle blocks.
- `spark.shuffle.s3.supportsUnbuffer`: Streams can be unbuffered instead of closed (default: `true`,
if Storage-backend is S3A, `false` otherwise).
- `spark.shuffle.checksum.enabled`: Enables checksums on Shuffle files (default: `true`)

**Note**: This option creates additional overhead if active. Suggested configuration: `false`.

- `spark.shuffle.s3.minimumReadSize`: The minimum size used for reading from the remote storage (default: `1024`).
- `spark.shuffle.s3.bufferSensitivity`: The sensitivity when adapting the buffer size to the RTT (default: `10`).


### Debug configuration options

Expand All @@ -75,6 +73,7 @@ Configuration options used for debugging:

**Note**: Can lead to invalid results.


## Testing

The tests require the following environment variables to be set:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,19 @@ class S3ShuffleDispatcher extends Logging {
private val isS3A = rootDir.startsWith("s3a://")

// Optional
val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024)
val bufferInputSize: Int = conf.getInt("spark.shuffle.s3.bufferInputSize", defaultValue = conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM).toInt)
val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = 4 * 1024 * 1024)
val cachePartitionLengths: Boolean = conf.getBoolean("spark.shuffle.s3.cachePartitionLengths", defaultValue = true)
val cacheChecksums: Boolean = conf.getBoolean("spark.shuffle.s3.cacheChecksums", defaultValue = true)
val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true)
val folderPrefixes: Int = conf.getInt("spark.shuffle.s3.folderPrefixes", defaultValue = 10)
val prefetchBatchSize: Int = conf.getInt("spark.shuffle.s3.prefetchBatchSize", defaultValue = 25)
val prefetchThreadPoolSize: Int = conf.getInt("spark.shuffle.s3.prefetchThreadPoolSize", defaultValue = 100)
val supportsUnbuffer: Boolean = conf.getBoolean("spark.shuffle.s3.supportsUnbuffer", defaultValue = isS3A)

// Debug
val alwaysCreateIndex: Boolean = conf.getBoolean("spark.shuffle.s3.alwaysCreateIndex", defaultValue = false)
val useBlockManager: Boolean = conf.getBoolean("spark.shuffle.s3.useBlockManager", defaultValue = true)
val forceBatchFetch: Boolean = conf.getBoolean("spark.shuffle.s3.forceBatchFetch", defaultValue = false)
val minimumReadSize: Int = conf.getInt("spark.shuffle.s3.minimumReadSize", defaultValue = 1024)
val bufferSensitivity: Int = conf.getInt("spark.shuffle.s3.bufferSensitivity", defaultValue = 100)

// Spark feature
val checksumAlgorithm: String = SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)
Expand All @@ -63,19 +62,18 @@ class S3ShuffleDispatcher extends Logging {

// Optional
logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}")
logInfo(s"- spark.shuffle.s3.bufferInputSize=${bufferInputSize}")
logInfo(s"- spark.shuffle.s3.cachePartitionLengths=${cachePartitionLengths}")
logInfo(s"- spark.shuffle.s3.cacheChecksums=${cacheChecksums}")
logInfo(s"- spark.shuffle.s3.cleanup=${cleanupShuffleFiles}")
logInfo(s"- spark.shuffle.s3.folderPrefixes=${folderPrefixes}")
logInfo(s"- spark.shuffle.s3.prefetchBlockSize=${prefetchBatchSize}")
logInfo(s"- spark.shuffle.s3.prefetchThreadPoolSize=${prefetchThreadPoolSize}")
logInfo(s"- spark.shuffle.s3.supportsUnbuffer=${supportsUnbuffer}")

// Debug
logInfo(s"- spark.shuffle.s3.alwaysCreateIndex=${alwaysCreateIndex} (default: false)")
logInfo(s"- spark.shuffle.s3.useBlockManager=${useBlockManager} (default: true)")
logInfo(s"- spark.shuffle.s3.forceBatchFetch=${forceBatchFetch} (default: false)")
logInfo(s"- spark.shuffle.s3.minimumReadSize=${minimumReadSize}")
logInfo(s"- spark.shuffle.s3.bufferSensitivity=${bufferSensitivity}")

// Spark
logInfo(s"- ${config.SHUFFLE_CHECKSUM_ALGORITHM.key}=${checksumAlgorithm}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@ package org.apache.spark.storage

import org.apache.hadoop.io.ElasticByteBufferPool
import org.apache.spark.SparkException
import org.apache.spark.shuffle.helper.S3ShuffleDispatcher
import org.apache.spark.storage.S3DoubleBufferedStream.{getBuffer, releaseBuffer}

import java.io.{EOFException, InputStream}
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global

class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) extends InputStream {
class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int,
timeWaiting: AtomicLong,
timePrefetching: AtomicLong,
bytesRead: AtomicLong,
numRequests: AtomicLong) extends InputStream {
private var buffers: Array[ByteBuffer] = {
val array = new Array[ByteBuffer](2)
array(0) = getBuffer(bufferSize)
Expand All @@ -25,12 +32,12 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
})
array
}
private var prefetching = Array.fill(2)(false)

var streamClosed = false
var pos: Long = 0
val maxBytes: Long = stream.maxBytes


private var bufIdx: Int = 0
var dataAvailable: Boolean = false
var error: Option[Throwable] = None
Expand Down Expand Up @@ -69,19 +76,25 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
// no data available
return
}
// Run on implicit global execution context.
val fut = Future[Int] {
val now = System.nanoTime()
buffer.clear()
var len: Int = 0
do {
len = writeTo(buffer, stream)
len = writeTo(buffer, stream, bufferSize)
if (len < 0) {
throw new EOFException()
}
} while (len == 0)
buffer.flip()

timePrefetching.addAndGet(System.nanoTime() - now)
numRequests.incrementAndGet()
bytesRead.addAndGet(len)
len
}(S3ShuffleReader.asyncExecutionContext)
fut.onComplete(onCompletePrefetch)(S3ShuffleReader.asyncExecutionContext)
}
fut.onComplete(onCompletePrefetch)
}

private def onCompletePrefetch(result: Try[Int]): Unit = synchronized {
Expand All @@ -97,7 +110,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
if (eof) {
return -1
}

val now = System.nanoTime()
while (error.isEmpty) {
if (buffers == null) {
throw new EOFException("Stream already closed")
Expand All @@ -108,6 +121,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
if (l < 0) {
throw new SparkException("Invalid state in shuffle read.")
}
timeWaiting.addAndGet(System.nanoTime() - now)
pos += 1
return l
}
Expand All @@ -129,6 +143,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
if (eof) {
return -1
}
val now = System.nanoTime()
while (error.isEmpty) {
if (buffers == null) {
throw new EOFException("Stream already closed")
Expand All @@ -139,6 +154,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
if (l < 0) {
throw new SparkException("Invalid state in shuffle read(buf).")
}
timeWaiting.addAndGet(System.nanoTime() - now)
pos += l
return l
}
Expand Down Expand Up @@ -212,17 +228,18 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte
buf.get() & 0xFF
}

private def writeTo(buf: ByteBuffer, src: InputStream): Int = {
val len = src.read(buf.array(), buf.position() + buf.arrayOffset(), buf.remaining())
private def writeTo(buf: ByteBuffer, src: InputStream, size: Int): Int = {
val len = src.read(buf.array(), buf.position() + buf.arrayOffset(), math.min(buf.remaining(), size))
buf.position(buf.position() + len)
len
}
}

object S3DoubleBufferedStream {

private lazy val pool = new ElasticByteBufferPool()

private def getBuffer(size: Int) = pool.getBuffer(false, size)
private def getBuffer(size: Int): ByteBuffer = pool.getBuffer(false, size)

private def releaseBuffer(buf: ByteBuffer) = pool.putBuffer(buf)
private def releaseBuffer(buf: ByteBuffer): Unit = pool.putBuffer(buf)
}
34 changes: 24 additions & 10 deletions src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper}
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReadMetricsReporter, ShuffleReader}
import org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchBlockInfo
import org.apache.spark.util.{CompletionIterator, ThreadUtils}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.{InterruptibleIterator, SparkConf, SparkEnv, TaskContext}

import scala.concurrent.{ExecutionContext}
import java.util.concurrent.atomic.AtomicLong

/**
* This class was adapted from Apache Spark: BlockStoreShuffleReader.
Expand All @@ -53,6 +53,11 @@ class S3ShuffleReader[K, C](
private val dispatcher = S3ShuffleDispatcher.get
private val dep = handle.dependency

private val timeWaiting = new AtomicLong(0)
private val timePrefetching = new AtomicLong(0)
private val bytesRead = new AtomicLong(0)
private val numRequests = new AtomicLong(0)

private val fetchContinousBlocksInBatch: Boolean = {
val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects
val compressed = conf.get(config.SHUFFLE_COMPRESS)
Expand Down Expand Up @@ -82,7 +87,7 @@ class S3ShuffleReader[K, C](
useBlockManager = dispatcher.useBlockManager)

val wrappedStreams = new S3ShuffleBlockIterator(blocks)
val bufferSize = dispatcher.bufferSize.toInt
val bufferSize = dispatcher.bufferSize

// Create a key/value iterator for each stream
val streamIter = wrappedStreams.filterNot(_._2.maxBytes == 0).map { case (blockId, wrappedStream) =>
Expand All @@ -92,7 +97,11 @@ class S3ShuffleReader[K, C](
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.

val stream = new S3DoubleBufferedStream(wrappedStream, bufferSize)
val stream = new S3DoubleBufferedStream(wrappedStream, bufferSize,
timeWaiting = timeWaiting,
timePrefetching = timePrefetching,
bytesRead = bytesRead,
numRequests = numRequests)
val checkedStream = if (dispatcher.checksumEnabled) {
new S3ChecksumValidationStream(blockId, stream, dispatcher.checksumAlgorithm)
} else {
Expand All @@ -102,12 +111,22 @@ class S3ShuffleReader[K, C](
(blockId, checkedStream)
}

val recordIter = new PrefetchIterator(streamIter).flatMap { case (blockId, stream) =>
val prefetchIter = new PrefetchIterator(streamIter).flatMap { case (blockId, stream) =>
serializerInstance
.deserializeStream(serializerManager.wrapStream(blockId, stream))
.asKeyValueIterator
}

val recordIter = new StatisticsIterator(prefetchIter, () => {
val tW = timeWaiting.get() / 1000000
val tP = timePrefetching.get() / 1000000
val bR = bytesRead.get()
val r = numRequests.get()
val tR = tP / r
val bW = bR.toDouble / (tP.toDouble / 1000) / (1024 * 1024)
logInfo(s"S3ShuffleReader statistics: ${bR} bytes read, ${tW} ms time waiting, ${tP} ms time prefetching (${tR} time/request) - ${bW} MiB/s")
})

// Update the context task metrics for each record read.
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
Expand Down Expand Up @@ -181,8 +200,3 @@ class S3ShuffleReader[K, C](
}
}
}

object S3ShuffleReader {
private lazy val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("s3-shuffle-reader-async-thread-pool", S3ShuffleDispatcher.get.prefetchThreadPoolSize)
lazy implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
}
13 changes: 13 additions & 0 deletions src/main/scala/org/apache/spark/storage/StatisticsIterator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.spark.storage

class StatisticsIterator[A](iter: Iterator[A], fun: () => Unit) extends Iterator[A] {
override def hasNext: Boolean = {
if (!iter.hasNext) {
fun()
return false
}
true
}

override def next(): A = iter.next()
}

0 comments on commit 983ba4d

Please sign in to comment.