Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified handling of GPU core dumps #9238

Merged
merged 9 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dist/unshimmed-common-from-spark311.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ com/nvidia/spark/RapidsUDF*
com/nvidia/spark/Retryable*
com/nvidia/spark/SQLPlugin*
com/nvidia/spark/rapids/ColumnarRdd*
com/nvidia/spark/rapids/GpuColumnVectorUtils*
com/nvidia/spark/rapids/ExplainPlan.class
com/nvidia/spark/rapids/ExplainPlan$.class
com/nvidia/spark/rapids/ExplainPlanBase.class
com/nvidia/spark/rapids/GpuColumnVectorUtils*
com/nvidia/spark/rapids/GpuCoreDumpMsg*
jlowe marked this conversation as resolved.
Show resolved Hide resolved
com/nvidia/spark/rapids/GpuKryoRegistrator*
com/nvidia/spark/rapids/PlanUtils*
com/nvidia/spark/rapids/RapidsExecutorHeartbeatMsg*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids

import java.io.File
import java.lang.management.ManagementFactory
import java.nio.file.Files
import java.util.concurrent.{Executors, ExecutorService, TimeUnit}

import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.shims.NullOutputStreamShim
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}

import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.util.SerializableConfiguration

trait GpuCoreDumpMsg
case class GpuCoreDumpMsgStart(executorId: String, dumpPath: String) extends GpuCoreDumpMsg
case class GpuCoreDumpMsgCompleted(executorId: String, dumpPath: String) extends GpuCoreDumpMsg

object GpuCoreDumpHandler extends Logging {
private var executor: Option[ExecutorService] = None
private var dumpedPath: Option[String] = None
private var namedPipeFile: File = _
private var isDumping: Boolean = false

/**
* Configures the executor launch environment for GPU core dumps, if applicable.
* Should only be called from the driver on driver startup.
*/
def driverInit(sc: SparkContext, conf: RapidsConf): Unit = {
conf.gpuCoreDumpDir.foreach { _ =>
TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_COREDUMP_ON_EXCEPTION", "1")
TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_CPU_COREDUMP_ON_EXCEPTION", "0")
TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_LIGHTWEIGHT_COREDUMP",
if (conf.isGpuCoreDumpFull) "0" else "1")
TrampolineUtil.setExecutorEnv(sc, "CUDA_COREDUMP_FILE", conf.gpuCoreDumpPipePattern)
TrampolineUtil.setExecutorEnv(sc, "CUDA_COREDUMP_SHOW_PROGRESS", "1")
}
}

/**
* Sets up the GPU core dump background copy thread, if applicable.
* Should only be called from the executor on executor startup.
*/
def executorInit(rapidsConf: RapidsConf, pluginCtx: PluginContext): Unit = {
rapidsConf.gpuCoreDumpDir.foreach { dumpDir =>
namedPipeFile = createNamedPipe(rapidsConf)
executor = Some(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("gpu-core-copier")
.setDaemon(true)
.build()))
executor.foreach { exec =>
val codec = if (rapidsConf.isGpuCoreDumpCompressed) {
Some(TrampolineUtil.createCodec(pluginCtx.conf(),
rapidsConf.gpuCoreDumpCompressionCodec))
} else {
None
}
val suffix = codec.map { c =>
"." + TrampolineUtil.getCodecShortName(c.getClass.getName)
}.getOrElse("")
exec.submit(new Runnable {
override def run(): Unit = {
try {
copyLoop(pluginCtx, namedPipeFile, new Path(dumpDir), codec, suffix)
} catch {
case _: InterruptedException => logInfo("Stopping GPU core dump copy thread")
case t: Throwable => logWarning("Error in GPU core dump copy thread", t)
}
}
})
}
}
}

/**
* Wait for a GPU dump in progress, if any, to complete
* @param timeoutSecs maximum amount of time to wait before returning
* @return true if the wait timedout, false otherwise
*/
def waitForDump(timeoutSecs: Int): Boolean = {
val endTime = System.nanoTime + TimeUnit.SECONDS.toNanos(timeoutSecs)
while (isDumping && System.nanoTime < endTime) {
Thread.sleep(10)
}
System.nanoTime < endTime
}

def shutdown(): Unit = {
executor.foreach { exec =>
exec.shutdownNow()
executor = None
namedPipeFile.delete()
namedPipeFile = null
}
}

def handleMsg(msg: GpuCoreDumpMsg): AnyRef = msg match {
case GpuCoreDumpMsgStart(executorId, dumpPath) =>
logError(s"Executor $executorId starting a GPU core dump to $dumpPath")
val spark = SparkSession.active
new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
case GpuCoreDumpMsgCompleted(executorId, dumpPath) =>
logError(s"Executor $executorId wrote a GPU core dump to $dumpPath")
null
case m =>
throw new IllegalStateException(s"Unexpected GPU core dump msg: $m")
}

// visible for testing
def getNamedPipeFile: File = namedPipeFile

private def createNamedPipe(conf: RapidsConf): File = {
val processName = ManagementFactory.getRuntimeMXBean.getName
val pidstr = processName.substring(0, processName.indexOf("@"))
val pipePath = conf.gpuCoreDumpPipePattern.replace("%p", pidstr)
val pipeFile = new File(pipePath)
val mkFifoProcess = Runtime.getRuntime.exec(Array("mkfifo", "-m", "600", pipeFile.toString))
require(mkFifoProcess.waitFor(10, TimeUnit.SECONDS), "mkfifo timed out")
pipeFile.deleteOnExit()
pipeFile
}

private def copyLoop(
pluginCtx: PluginContext,
namedPipe: File,
dumpDirPath: Path,
codec: Option[CompressionCodec],
suffix: String): Unit = {
try {
logInfo(s"Monitoring ${namedPipe.getAbsolutePath} for GPU core dumps")
withResource(new java.io.FileInputStream(namedPipe)) { in =>
isDumping = true
val appId = pluginCtx.conf.get("spark.app.id")
val executorId = pluginCtx.executorID()
val dumpPath = new Path(dumpDirPath,
s"gpucore-$appId-$executorId.nvcudmp$suffix")
logError(s"Generating GPU core dump at $dumpPath")
val hadoopConf = pluginCtx.ask(GpuCoreDumpMsgStart(executorId, dumpPath.toString))
.asInstanceOf[SerializableConfiguration].value
val dumpFs = dumpPath.getFileSystem(hadoopConf)
val bufferSize = hadoopConf.getInt("io.file.buffer.size", 4096)
val perms = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)
val fsOut = dumpFs.create(dumpPath, perms, false, bufferSize,
dumpFs.getDefaultReplication(dumpPath), dumpFs.getDefaultBlockSize(dumpPath), null)
val out = closeOnExcept(fsOut) { _ =>
codec.map(_.compressedOutputStream(fsOut)).getOrElse(fsOut)
}
withResource(out) { _ =>
IOUtils.copy(in, out)
}
dumpedPath = Some(dumpPath.toString)
pluginCtx.send(GpuCoreDumpMsgCompleted(executorId, dumpedPath.get))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
} catch {
case e: Exception =>
logError("Error copying GPU dump", e)
} finally {
isDumping = false
}
// Always drain the pipe to avoid blocking the thread that triggers the coredump
while (namedPipe.exists()) {
Files.copy(namedPipe.toPath, NullOutputStreamShim.INSTANCE)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
s"Rpc message $msg received, but shuffle heartbeat manager not configured.")
}
rapidsShuffleHeartbeatManager.executorHeartbeat(id)
case m: GpuCoreDumpMsg => GpuCoreDumpHandler.handleMsg(m)
case m => throw new IllegalStateException(s"Unknown message $m")
}
}
Expand All @@ -279,6 +280,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
RapidsPluginUtils.fixupConfigsOnDriver(sparkConf)
val conf = new RapidsConf(sparkConf)
RapidsPluginUtils.logPluginMode(conf)
GpuCoreDumpHandler.driverInit(sc, conf)

if (GpuShuffleEnv.isRapidsShuffleAvailable(conf)) {
GpuShuffleEnv.initShuffleManager()
Expand Down Expand Up @@ -351,6 +353,8 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
}
}

GpuCoreDumpHandler.executorInit(conf, pluginContext)

// we rely on the Rapids Plugin being run with 1 GPU per executor so we can initialize
// on executor startup.
if (!GpuDeviceManager.rmmTaskInitEnabled) {
Expand Down Expand Up @@ -475,6 +479,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
Option(rapidsShuffleHeartbeatEndpoint).foreach(_.close())
extraExecutorPlugins.foreach(_.shutdown())
FileCache.shutdown()
GpuCoreDumpHandler.shutdown()
}

override def onTaskFailed(failureReason: TaskFailedReason): Unit = {
Expand All @@ -487,6 +492,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
case Some(e) if containsCudaFatalException(e) =>
logError("Stopping the Executor based on exception being a fatal CUDA error: " +
s"${ef.toErrorString}")
GpuCoreDumpHandler.waitForDump(timeoutSecs = 60)
logGpuDebugInfoAndExit(systemExitCode = 20)
case Some(_: CudaException) =>
logDebug(s"Executor onTaskFailed because of a non-fatal CUDA error: " +
Expand Down
46 changes: 46 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,42 @@ object RapidsConf {
.integerConf
.createWithDefault(2)

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val GPU_COREDUMP_DIR = conf("spark.rapids.gpu.coreDump.dir")
.doc("The URI to a directory where a GPU core dump will be created if the GPU encounters " +
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
"an exception. The filename will be of the form gpucore-<appID>-<executorID>.nvcudmp, " +
"where <appID> is the Spark application ID and <executorID> is the executor ID.")
.internal()
.stringConf
.createOptional

val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.doc("The pattern to use to generate the named pipe path. Occurrences of %p in the pattern " +
"will be replaced with the process ID of the executor.")
.internal
.stringConf
.createWithDefault("gpucorepipe.%p")

val GPU_COREDUMP_FULL = conf("spark.rapids.gpu.coreDump.full")
.doc("If true, GPU coredumps will be a full coredump (i.e.: with local, shared, and global " +
"memory).")
.internal()
.booleanConf
.createWithDefault(false)

val GPU_COREDUMP_COMPRESSION_CODEC = conf("spark.rapids.gpu.coreDump.compression.codec")
.doc("The codec used to compress GPU core dumps. Spark provides the codecs " +
"lz4, lzf, snappy, and zstd.")
.internal()
.stringConf
.createWithDefault("zstd")

val GPU_COREDUMP_COMPRESS = conf("spark.rapids.gpu.coreDump.compress")
.doc("If true, GPU coredumps will be compressed using the compression codec specified " +
s"in $GPU_COREDUMP_COMPRESSION_CODEC")
.internal()
.booleanConf
.createWithDefault(true)

private val RMM_ALLOC_MAX_FRACTION_KEY = "spark.rapids.memory.gpu.maxAllocFraction"
private val RMM_ALLOC_MIN_FRACTION_KEY = "spark.rapids.memory.gpu.minAllocFraction"
private val RMM_ALLOC_RESERVE_KEY = "spark.rapids.memory.gpu.reserve"
Expand Down Expand Up @@ -2241,6 +2277,16 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val gpuOomMaxRetries: Int = get(GPU_OOM_MAX_RETRIES)

lazy val gpuCoreDumpDir: Option[String] = get(GPU_COREDUMP_DIR)

lazy val gpuCoreDumpPipePattern: String = get(GPU_COREDUMP_PIPE_PATTERN)

lazy val isGpuCoreDumpFull: Boolean = get(GPU_COREDUMP_FULL)

lazy val isGpuCoreDumpCompressed: Boolean = get(GPU_COREDUMP_COMPRESS)

lazy val gpuCoreDumpCompressionCodec: String = get(GPU_COREDUMP_COMPRESSION_CODEC)

lazy val isUvmEnabled: Boolean = get(UVM_ENABLED)

lazy val isPooledMemEnabled: Boolean = get(POOLED_MEM)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.InputMetrics
import org.apache.spark.internal.config.EXECUTOR_ID
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
Expand Down Expand Up @@ -173,4 +174,14 @@ object TrampolineUtil {
def getSparkConf(spark: SparkSession): SQLConf = {
spark.sqlContext.conf
}

def setExecutorEnv(sc: SparkContext, key: String, value: String): Unit = {
sc.executorEnvs(key) = value
}

def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
CompressionCodec.createCodec(conf, codecName)
}

def getCodecShortName(codecName: String): String = CompressionCodec.getShortName(codecName)
}
Loading