Skip to content

Commit

Permalink
Scalafmt: Format source code.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed May 8, 2024
1 parent d6e400f commit b2ae548
Show file tree
Hide file tree
Showing 16 changed files with 365 additions and 312 deletions.
18 changes: 11 additions & 7 deletions src/main/scala/org/apache/spark/shuffle/ConcurrentObjectMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ class ConcurrentObjectMap[K, V] {
}

def getOrElsePut(key: K, op: K => V): V = {
val l = valueLocks.get(key).getOrElse({
lock.synchronized {
valueLocks.getOrElseUpdate(key, {
new Object()
})
}
})
val l = valueLocks
.get(key)
.getOrElse({
lock.synchronized {
valueLocks.getOrElseUpdate(
key, {
new Object()
}
)
}
})
l.synchronized {
return map.getOrElseUpdate(key, op(key))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ class S3MeasureOutputStream(var out: OutputStream, label: String = "") extends O
private var timings: Long = 0
private var bytes: Long = 0


private def checkOpen(): Unit = {
if (!isOpen) {
throw new IOException("The stream is already closed!")
Expand Down Expand Up @@ -58,7 +57,9 @@ class S3MeasureOutputStream(var out: OutputStream, label: String = "") extends O
val sAt = tc.stageAttemptNumber()
val t = timings / 1000000
val bw = bytes.toDouble / (t.toDouble / 1000) / (1024 * 1024)
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
s"Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)")
logInfo(
s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
s"Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)"
)
}
}
7 changes: 3 additions & 4 deletions src/main/scala/org/apache/spark/shuffle/S3ShuffleDataIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class S3ShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
}

override def createSingleFileMapOutputWriter(
shuffleId: Int,
mapId: Long
): Optional[SingleSpillShuffleMapOutputWriter] = {
shuffleId: Int,
mapId: Long
): Optional[SingleSpillShuffleMapOutputWriter] = {
Optional.of(new S3SingleSpillShuffleMapOutputWriter(shuffleId, mapId))
}
}
Expand Down Expand Up @@ -67,4 +67,3 @@ class S3ShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ import java.nio.ByteBuffer
import java.nio.channels.{Channels, WritableByteChannel}
import java.util.Optional

/**
* Implements the ShuffleMapOutputWriter interface. It stores the shuffle output in one
* shuffle block.
*
* This file is based on Spark "LocalDiskShuffleMapOutputWriter.java".
*/
/** Implements the ShuffleMapOutputWriter interface. It stores the shuffle output in one shuffle block.
*
* This file is based on Spark "LocalDiskShuffleMapOutputWriter.java".
*/

class S3ShuffleMapOutputWriter(
conf: SparkConf,
shuffleId: Int,
mapId: Long,
numPartitions: Int,
) extends ShuffleMapOutputWriter with Logging {
conf: SparkConf,
shuffleId: Int,
mapId: Long,
numPartitions: Int
) extends ShuffleMapOutputWriter
with Logging {
val dispatcher = S3ShuffleDispatcher.get

/* Target block for writing */
Expand All @@ -44,7 +43,8 @@ class S3ShuffleMapOutputWriter(
def initStream(): Unit = {
if (stream == null) {
stream = dispatcher.createBlock(shuffleBlock)
bufferedStream = new S3MeasureOutputStream(new BufferedOutputStream(stream, dispatcher.bufferSize), shuffleBlock.name)
bufferedStream =
new S3MeasureOutputStream(new BufferedOutputStream(stream, dispatcher.bufferSize), shuffleBlock.name)
}
}

Expand All @@ -59,10 +59,11 @@ class S3ShuffleMapOutputWriter(
private var totalBytesWritten: Long = 0
private var lastPartitionWriterId: Int = -1

/**
* @param reducePartitionId Monotonically increasing, as per contract in ShuffleMapOutputWriter.
* @return An instance of the ShufflePartitionWriter exposing the single output stream.
*/
/** @param reducePartitionId
* Monotonically increasing, as per contract in ShuffleMapOutputWriter.
* @return
* An instance of the ShufflePartitionWriter exposing the single output stream.
*/
override def getPartitionWriter(reducePartitionId: Int): ShufflePartitionWriter = {
if (reducePartitionId <= lastPartitionWriterId) {
throw new RuntimeException("Precondition: Expect a monotonically increasing reducePartitionId.")
Expand All @@ -81,19 +82,21 @@ class S3ShuffleMapOutputWriter(
new S3ShufflePartitionWriter(reducePartitionId)
}

/**
* Close all writers and the shuffle block.
*
* @param checksums Ignored.
* @return
*/
/** Close all writers and the shuffle block.
*
* @param checksums
* Ignored.
* @return
*/
override def commitAllPartitions(checksums: Array[Long]): MapOutputCommitMessage = {
if (bufferedStream != null) {
bufferedStream.flush()
}
if (stream != null) {
if (stream.getPos != totalBytesWritten) {
throw new RuntimeException(f"S3ShuffleMapOutputWriter: Unexpected output length ${stream.getPos}, expected: ${totalBytesWritten}.")
throw new RuntimeException(
f"S3ShuffleMapOutputWriter: Unexpected output length ${stream.getPos}, expected: ${totalBytesWritten}."
)
}
}
if (bufferedStreamAsChannel != null) {
Expand Down Expand Up @@ -198,8 +201,7 @@ class S3ShuffleMapOutputWriter(
}
}

private class S3ShufflePartitionWriterChannel(reduceId: Int)
extends WritableByteChannelWrapper {
private class S3ShufflePartitionWriterChannel(reduceId: Int) extends WritableByteChannelWrapper {
private val partChannel = new S3PartitionWritableByteChannel(bufferedStreamAsChannel)

override def channel(): WritableByteChannel = {
Expand All @@ -216,8 +218,7 @@ class S3ShuffleMapOutputWriter(
}
}

private class S3PartitionWritableByteChannel(channel: WritableByteChannel)
extends WritableByteChannel {
private class S3PartitionWritableByteChannel(channel: WritableByteChannel) extends WritableByteChannel {

private var count: Long = 0

Expand All @@ -229,8 +230,7 @@ class S3ShuffleMapOutputWriter(
channel.isOpen()
}

override def close(): Unit = {
}
override def close(): Unit = {}

override def write(x: ByteBuffer): Int = {
var c = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ class S3ShuffleWriter[K, V](writer: ShuffleWriter[K, V]) extends ShuffleWriter[K

override def getPartitionLengths(): Array[Long] = writer.getPartitionLengths()
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ import org.apache.spark.util.Utils
import java.io.{File, FileInputStream}
import java.nio.file.{Files, Path}

class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends SingleSpillShuffleMapOutputWriter with Logging {
class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long)
extends SingleSpillShuffleMapOutputWriter
with Logging {

private lazy val dispatcher = S3ShuffleDispatcher.get

override def transferMapSpillFile(
mapSpillFile: File,
partitionLengths: Array[Long],
checksums: Array[Long]
): Unit = {
mapSpillFile: File,
partitionLengths: Array[Long],
checksums: Array[Long]
): Unit = {
val block = ShuffleDataBlockId(shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)

if (dispatcher.rootIsLocal) {
Expand All @@ -44,8 +46,10 @@ class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends S
val sAt = tc.stageAttemptNumber()
val t = timings / 1000000
val bw = bytes.toDouble / (t.toDouble / 1000) / (1024 * 1024)
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
s"Writing ${block.name} ${bytes} took ${t} ms (${bw} MiB/s)")
logInfo(
s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
s"Writing ${block.name} ${bytes} took ${t} ms (${bw} MiB/s)"
)
} else {
// Copy using a stream.
val in = new FileInputStream(mapSpillFile)
Expand Down
Loading

0 comments on commit b2ae548

Please sign in to comment.