diff --git a/README.md b/README.md index b917941..a960ab3 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,7 @@ These configuration values need to be passed to Spark to load and configure the Changing these values might have an impact on performance. -- `spark.shuffle.s3.bufferSize`: Default size of the buffered output streams (default: `32768`, - uses `spark.shuffle.file.buffer` as default) +- `spark.shuffle.s3.bufferSize`: The maximum buffer size (default: `4194304`). Max memory consumption: `4x NUM_TASKS`. - `spark.shuffle.s3.cachePartitionLengths`: Cache partition lengths in memory (default: `true`) - `spark.shuffle.s3.cacheChecksums`: Cache checksums in memory (default: `true`) - `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`) @@ -47,16 +46,15 @@ Changing these values might have an impact on performance. (default: `10`, minimum: `1`). **Note**: This option can be used to optimize performance on object stores which have a prefix rate-limit. -- `spark.shuffle.s3.prefetchBatchSize`: Prefetch batch size (default: `25`). Controls how many partitions are prefetched - concurrently per task. -- `spark.shuffle.s3.prefetchThreadPoolSize`: Prefetch thread pool size (default: `100`). The total size of the thread - pool used for prefetching the shuffle blocks. - `spark.shuffle.s3.supportsUnbuffer`: Streams can be unbuffered instead of closed (default: `true`, if Storage-backend is S3A, `false` otherwise). - `spark.shuffle.checksum.enabled`: Enables checksums on Shuffle files (default: `true`) **Note**: This option creates additional overhead if active. Suggested configuration: `false`. +- `spark.shuffle.s3.minimumReadSize`: The minimum size used for reading from the remote storage (default: `1024`). +- `spark.shuffle.s3.bufferSensitivity`: The sensitivity when adapting the buffer size to the RTT (default: `10`). + ### Debug configuration options @@ -75,6 +73,7 @@ Configuration options used for debugging: **Note**: Can lead to invalid results. + ## Testing The tests require the following environment variables to be set: diff --git a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala index efc4b62..68717c1 100644 --- a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala +++ b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala @@ -34,20 +34,19 @@ class S3ShuffleDispatcher extends Logging { private val isS3A = rootDir.startsWith("s3a://") // Optional - val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024) - val bufferInputSize: Int = conf.getInt("spark.shuffle.s3.bufferInputSize", defaultValue = conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM).toInt) + val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = 4 * 1024 * 1024) val cachePartitionLengths: Boolean = conf.getBoolean("spark.shuffle.s3.cachePartitionLengths", defaultValue = true) val cacheChecksums: Boolean = conf.getBoolean("spark.shuffle.s3.cacheChecksums", defaultValue = true) val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true) val folderPrefixes: Int = conf.getInt("spark.shuffle.s3.folderPrefixes", defaultValue = 10) - val prefetchBatchSize: Int = conf.getInt("spark.shuffle.s3.prefetchBatchSize", defaultValue = 25) - val prefetchThreadPoolSize: Int = conf.getInt("spark.shuffle.s3.prefetchThreadPoolSize", defaultValue = 100) val supportsUnbuffer: Boolean = conf.getBoolean("spark.shuffle.s3.supportsUnbuffer", defaultValue = isS3A) // Debug val alwaysCreateIndex: Boolean = conf.getBoolean("spark.shuffle.s3.alwaysCreateIndex", defaultValue = false) val useBlockManager: Boolean = conf.getBoolean("spark.shuffle.s3.useBlockManager", defaultValue = true) val forceBatchFetch: Boolean = conf.getBoolean("spark.shuffle.s3.forceBatchFetch", defaultValue = false) + val minimumReadSize: Int = conf.getInt("spark.shuffle.s3.minimumReadSize", defaultValue = 1024) + val bufferSensitivity: Int = conf.getInt("spark.shuffle.s3.bufferSensitivity", defaultValue = 100) // Spark feature val checksumAlgorithm: String = SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM) @@ -63,19 +62,18 @@ class S3ShuffleDispatcher extends Logging { // Optional logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}") - logInfo(s"- spark.shuffle.s3.bufferInputSize=${bufferInputSize}") logInfo(s"- spark.shuffle.s3.cachePartitionLengths=${cachePartitionLengths}") logInfo(s"- spark.shuffle.s3.cacheChecksums=${cacheChecksums}") logInfo(s"- spark.shuffle.s3.cleanup=${cleanupShuffleFiles}") logInfo(s"- spark.shuffle.s3.folderPrefixes=${folderPrefixes}") - logInfo(s"- spark.shuffle.s3.prefetchBlockSize=${prefetchBatchSize}") - logInfo(s"- spark.shuffle.s3.prefetchThreadPoolSize=${prefetchThreadPoolSize}") logInfo(s"- spark.shuffle.s3.supportsUnbuffer=${supportsUnbuffer}") // Debug logInfo(s"- spark.shuffle.s3.alwaysCreateIndex=${alwaysCreateIndex} (default: false)") logInfo(s"- spark.shuffle.s3.useBlockManager=${useBlockManager} (default: true)") logInfo(s"- spark.shuffle.s3.forceBatchFetch=${forceBatchFetch} (default: false)") + logInfo(s"- spark.shuffle.s3.minimumReadSize=${minimumReadSize}") + logInfo(s"- spark.shuffle.s3.bufferSensitivity=${bufferSensitivity}") // Spark logInfo(s"- ${config.SHUFFLE_CHECKSUM_ALGORITHM.key}=${checksumAlgorithm}") diff --git a/src/main/scala/org/apache/spark/storage/S3DoubleBufferedStream.scala b/src/main/scala/org/apache/spark/storage/S3DoubleBufferedStream.scala index 06fd8f9..60c5551 100644 --- a/src/main/scala/org/apache/spark/storage/S3DoubleBufferedStream.scala +++ b/src/main/scala/org/apache/spark/storage/S3DoubleBufferedStream.scala @@ -7,14 +7,21 @@ package org.apache.spark.storage import org.apache.hadoop.io.ElasticByteBufferPool import org.apache.spark.SparkException +import org.apache.spark.shuffle.helper.S3ShuffleDispatcher import org.apache.spark.storage.S3DoubleBufferedStream.{getBuffer, releaseBuffer} import java.io.{EOFException, InputStream} import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong import scala.concurrent.Future import scala.util.{Failure, Success, Try} +import scala.concurrent.ExecutionContext.Implicits.global -class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) extends InputStream { +class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int, + timeWaiting: AtomicLong, + timePrefetching: AtomicLong, + bytesRead: AtomicLong, + numRequests: AtomicLong) extends InputStream { private var buffers: Array[ByteBuffer] = { val array = new Array[ByteBuffer](2) array(0) = getBuffer(bufferSize) @@ -25,12 +32,12 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte }) array } - private var prefetching = Array.fill(2)(false) var streamClosed = false var pos: Long = 0 val maxBytes: Long = stream.maxBytes + private var bufIdx: Int = 0 var dataAvailable: Boolean = false var error: Option[Throwable] = None @@ -69,19 +76,25 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte // no data available return } + // Run on implicit global execution context. val fut = Future[Int] { + val now = System.nanoTime() buffer.clear() var len: Int = 0 do { - len = writeTo(buffer, stream) + len = writeTo(buffer, stream, bufferSize) if (len < 0) { throw new EOFException() } } while (len == 0) buffer.flip() + + timePrefetching.addAndGet(System.nanoTime() - now) + numRequests.incrementAndGet() + bytesRead.addAndGet(len) len - }(S3ShuffleReader.asyncExecutionContext) - fut.onComplete(onCompletePrefetch)(S3ShuffleReader.asyncExecutionContext) + } + fut.onComplete(onCompletePrefetch) } private def onCompletePrefetch(result: Try[Int]): Unit = synchronized { @@ -97,7 +110,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte if (eof) { return -1 } - + val now = System.nanoTime() while (error.isEmpty) { if (buffers == null) { throw new EOFException("Stream already closed") @@ -108,6 +121,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte if (l < 0) { throw new SparkException("Invalid state in shuffle read.") } + timeWaiting.addAndGet(System.nanoTime() - now) pos += 1 return l } @@ -129,6 +143,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte if (eof) { return -1 } + val now = System.nanoTime() while (error.isEmpty) { if (buffers == null) { throw new EOFException("Stream already closed") @@ -139,6 +154,7 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte if (l < 0) { throw new SparkException("Invalid state in shuffle read(buf).") } + timeWaiting.addAndGet(System.nanoTime() - now) pos += l return l } @@ -212,17 +228,18 @@ class S3DoubleBufferedStream(stream: S3ShuffleBlockStream, bufferSize: Int) exte buf.get() & 0xFF } - private def writeTo(buf: ByteBuffer, src: InputStream): Int = { - val len = src.read(buf.array(), buf.position() + buf.arrayOffset(), buf.remaining()) + private def writeTo(buf: ByteBuffer, src: InputStream, size: Int): Int = { + val len = src.read(buf.array(), buf.position() + buf.arrayOffset(), math.min(buf.remaining(), size)) buf.position(buf.position() + len) len } } object S3DoubleBufferedStream { + private lazy val pool = new ElasticByteBufferPool() - private def getBuffer(size: Int) = pool.getBuffer(false, size) + private def getBuffer(size: Int): ByteBuffer = pool.getBuffer(false, size) - private def releaseBuffer(buf: ByteBuffer) = pool.putBuffer(buf) + private def releaseBuffer(buf: ByteBuffer): Unit = pool.putBuffer(buf) } \ No newline at end of file diff --git a/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala b/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala index a34be6c..0e3efe7 100644 --- a/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala +++ b/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala @@ -28,11 +28,11 @@ import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.helper.{S3ShuffleDispatcher, S3ShuffleHelper} import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReadMetricsReporter, ShuffleReader} import org.apache.spark.storage.ShuffleBlockFetcherIterator.FetchBlockInfo -import org.apache.spark.util.{CompletionIterator, ThreadUtils} +import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter import org.apache.spark.{InterruptibleIterator, SparkConf, SparkEnv, TaskContext} -import scala.concurrent.{ExecutionContext} +import java.util.concurrent.atomic.AtomicLong /** * This class was adapted from Apache Spark: BlockStoreShuffleReader. @@ -53,6 +53,11 @@ class S3ShuffleReader[K, C]( private val dispatcher = S3ShuffleDispatcher.get private val dep = handle.dependency + private val timeWaiting = new AtomicLong(0) + private val timePrefetching = new AtomicLong(0) + private val bytesRead = new AtomicLong(0) + private val numRequests = new AtomicLong(0) + private val fetchContinousBlocksInBatch: Boolean = { val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects val compressed = conf.get(config.SHUFFLE_COMPRESS) @@ -82,7 +87,7 @@ class S3ShuffleReader[K, C]( useBlockManager = dispatcher.useBlockManager) val wrappedStreams = new S3ShuffleBlockIterator(blocks) - val bufferSize = dispatcher.bufferSize.toInt + val bufferSize = dispatcher.bufferSize // Create a key/value iterator for each stream val streamIter = wrappedStreams.filterNot(_._2.maxBytes == 0).map { case (blockId, wrappedStream) => @@ -92,7 +97,11 @@ class S3ShuffleReader[K, C]( // NextIterator. The NextIterator makes sure that close() is called on the // underlying InputStream when all records have been read. - val stream = new S3DoubleBufferedStream(wrappedStream, bufferSize) + val stream = new S3DoubleBufferedStream(wrappedStream, bufferSize, + timeWaiting = timeWaiting, + timePrefetching = timePrefetching, + bytesRead = bytesRead, + numRequests = numRequests) val checkedStream = if (dispatcher.checksumEnabled) { new S3ChecksumValidationStream(blockId, stream, dispatcher.checksumAlgorithm) } else { @@ -102,12 +111,22 @@ class S3ShuffleReader[K, C]( (blockId, checkedStream) } - val recordIter = new PrefetchIterator(streamIter).flatMap { case (blockId, stream) => + val prefetchIter = new PrefetchIterator(streamIter).flatMap { case (blockId, stream) => serializerInstance .deserializeStream(serializerManager.wrapStream(blockId, stream)) .asKeyValueIterator } + val recordIter = new StatisticsIterator(prefetchIter, () => { + val tW = timeWaiting.get() / 1000000 + val tP = timePrefetching.get() / 1000000 + val bR = bytesRead.get() + val r = numRequests.get() + val tR = tP / r + val bW = bR.toDouble / (tP.toDouble / 1000) / (1024 * 1024) + logInfo(s"S3ShuffleReader statistics: ${bR} bytes read, ${tW} ms time waiting, ${tP} ms time prefetching (${tR} time/request) - ${bW} MiB/s") + }) + // Update the context task metrics for each record read. val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => @@ -181,8 +200,3 @@ class S3ShuffleReader[K, C]( } } } - -object S3ShuffleReader { - private lazy val asyncThreadPool = ThreadUtils.newDaemonCachedThreadPool("s3-shuffle-reader-async-thread-pool", S3ShuffleDispatcher.get.prefetchThreadPoolSize) - lazy implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool) -} diff --git a/src/main/scala/org/apache/spark/storage/StatisticsIterator.scala b/src/main/scala/org/apache/spark/storage/StatisticsIterator.scala new file mode 100644 index 0000000..2c01f64 --- /dev/null +++ b/src/main/scala/org/apache/spark/storage/StatisticsIterator.scala @@ -0,0 +1,13 @@ +package org.apache.spark.storage + +class StatisticsIterator[A](iter: Iterator[A], fun: () => Unit) extends Iterator[A] { + override def hasNext: Boolean = { + if (!iter.hasNext) { + fun() + return false + } + true + } + + override def next(): A = iter.next() +}