From c90ca76e16fd832edc24b4b07af731706da85016 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 8 Sep 2023 15:51:09 -0500 Subject: [PATCH 1/8] Simplified handling of GPU core dumps Signed-off-by: Jason Lowe --- dist/unshimmed-common-from-spark311.txt | 3 +- .../spark/rapids/GpuCoreDumpHandler.scala | 170 +++++++++++++++++ .../com/nvidia/spark/rapids/Plugin.scala | 5 + .../com/nvidia/spark/rapids/RapidsConf.scala | 45 +++++ .../sql/rapids/execution/TrampolineUtil.scala | 13 +- .../rapids/GpuCoreDumpHandlerSuite.scala | 177 ++++++++++++++++++ 6 files changed, 411 insertions(+), 2 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala diff --git a/dist/unshimmed-common-from-spark311.txt b/dist/unshimmed-common-from-spark311.txt index b044c2cb4c2..68c128ab30b 100644 --- a/dist/unshimmed-common-from-spark311.txt +++ b/dist/unshimmed-common-from-spark311.txt @@ -7,10 +7,11 @@ com/nvidia/spark/ParquetCachedBatchSerializer* com/nvidia/spark/RapidsUDF* com/nvidia/spark/SQLPlugin* com/nvidia/spark/rapids/ColumnarRdd* -com/nvidia/spark/rapids/GpuColumnVectorUtils* com/nvidia/spark/rapids/ExplainPlan.class com/nvidia/spark/rapids/ExplainPlan$.class com/nvidia/spark/rapids/ExplainPlanBase.class +com/nvidia/spark/rapids/GpuColumnVectorUtils* +com/nvidia/spark/rapids/GpuCoreDumpMsg* com/nvidia/spark/rapids/GpuKryoRegistrator* com/nvidia/spark/rapids/PlanUtils* com/nvidia/spark/rapids/RapidsExecutorHeartbeatMsg* diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala new file mode 100644 index 00000000000..f46e4f9ca81 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids + +import java.io.File +import java.lang.management.ManagementFactory +import java.nio.file.Files +import java.util.concurrent.{Executors, ExecutorService, TimeUnit} + +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.shims.NullOutputStreamShim +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.{FsAction, FsPermission} + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.PluginContext +import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.util.SerializableConfiguration + +trait GpuCoreDumpMsg +case class GpuCoreDumpMsgStart(executorId: String, dumpPath: String) extends GpuCoreDumpMsg +case class GpuCoreDumpMsgCompleted(executorId: String, dumpPath: String) extends GpuCoreDumpMsg + +object GpuCoreDumpHandler extends Logging { + private var executor: Option[ExecutorService] = None + private var dumpedPath: Option[String] = None + private var namedPipeFile: File = _ + + /** + * Configures the executor launch environment for GPU core dumps, if applicable. + * Should only be called from the driver on driver startup. + */ + def driverInit(sc: SparkContext, conf: RapidsConf): Unit = { + conf.gpuCoreDumpDir.foreach { _ => + TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_COREDUMP_ON_EXCEPTION", "1") + TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_CPU_COREDUMP_ON_EXCEPTION", "0") + TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_LIGHTWEIGHT_COREDUMP", + if (conf.isGpuCoreDumpFull) "0" else "1") + TrampolineUtil.setExecutorEnv(sc, "CUDA_COREDUMP_FILE", conf.gpuCoreDumpPipePattern) + TrampolineUtil.setExecutorEnv(sc, "CUDA_COREDUMP_SHOW_PROGRESS", "1") + } + } + + /** + * Sets up the GPU core dump background copy thread, if applicable. + * Should only be called from the executor on executor startup. + */ + def executorInit(rapidsConf: RapidsConf, pluginCtx: PluginContext): Unit = { + rapidsConf.gpuCoreDumpDir.foreach { dumpDir => + namedPipeFile = createNamedPipe(rapidsConf) + executor = Some(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("gpu-core-copier") + .setDaemon(true) + .build())) + executor.foreach { exec => + val codec = if (rapidsConf.isGpuCoreDumpCompressed) { + Some(TrampolineUtil.createCodec(pluginCtx.conf(), + rapidsConf.gpuCoreDumpCompressionCodec)) + } else { + None + } + val suffix = codec.map { c => + "." + TrampolineUtil.getCodecShortName(c.getClass.getName) + }.getOrElse("") + exec.submit(new Runnable { + override def run(): Unit = { + try { + copyLoop(pluginCtx, namedPipeFile, new Path(dumpDir), codec, suffix) + } catch { + case _: InterruptedException => logInfo("Stopping GPU core dump copy thread") + case t: Throwable => logWarning("Error in GPU core dump copy thread", t) + } + } + }) + } + } + } + + def shutdown(): Unit = { + executor.foreach { exec => + exec.shutdownNow() + executor = None + namedPipeFile.delete() + namedPipeFile = null + } + } + + def handleMsg(msg: GpuCoreDumpMsg): AnyRef = msg match { + case GpuCoreDumpMsgStart(executorId, dumpPath) => + logError(s"Executor $executorId starting a GPU core dump to $dumpPath") + val spark = SparkSession.active + new SerializableConfiguration(spark.sparkContext.hadoopConfiguration) + case GpuCoreDumpMsgCompleted(executorId, dumpPath) => + logError(s"Executor $executorId wrote a GPU core dump to $dumpPath") + null + case m => + throw new IllegalStateException(s"Unexpected GPU core dump msg: $m") + } + + // visible for testing + def getNamedPipeFile: File = namedPipeFile + + private def createNamedPipe(conf: RapidsConf): File = { + val processName = ManagementFactory.getRuntimeMXBean.getName + val pidstr = processName.substring(0, processName.indexOf("@")) + val pipePath = conf.gpuCoreDumpPipePattern.replace("%p", pidstr) + val pipeFile = new File(pipePath) + val mkFifoProcess = Runtime.getRuntime.exec(Array("mkfifo", "-m", "600", pipeFile.toString)) + require(mkFifoProcess.waitFor(10, TimeUnit.SECONDS), "mkfifo timed out") + pipeFile.deleteOnExit() + pipeFile + } + + private def copyLoop( + pluginCtx: PluginContext, + namedPipe: File, + dumpDirPath: Path, + codec: Option[CompressionCodec], + suffix: String): Unit = { + try { + logInfo(s"Monitoring ${namedPipe.getAbsolutePath} for GPU core dumps") + withResource(new java.io.FileInputStream(namedPipe)) { in => + val appId = pluginCtx.conf.get("spark.app.id") + val executorId = pluginCtx.executorID() + val dumpPath = new Path(dumpDirPath, + s"gpucore-$appId-$executorId.nvcudmp$suffix") + logError(s"Generating GPU core dump at $dumpPath") + val hadoopConf = pluginCtx.ask(GpuCoreDumpMsgStart(executorId, dumpPath.toString)) + .asInstanceOf[SerializableConfiguration].value + val dumpFs = dumpPath.getFileSystem(hadoopConf) + val bufferSize = hadoopConf.getInt("io.file.buffer.size", 4096) + val perms = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE) + val fsOut = dumpFs.create(dumpPath, perms, false, bufferSize, + dumpFs.getDefaultReplication(dumpPath), dumpFs.getDefaultBlockSize(dumpPath), null) + val out = closeOnExcept(fsOut) { _ => + codec.map(_.compressedOutputStream(fsOut)).getOrElse(fsOut) + } + withResource(out) { _ => + IOUtils.copy(in, out) + } + dumpedPath = Some(dumpPath.toString) + pluginCtx.send(GpuCoreDumpMsgCompleted(executorId, dumpedPath.get)) + } + } catch { + case e: Exception => + logError("Error copying GPU dump", e) + } + // Always drain the pipe to avoid blocking the thread that triggers the coredump + while (namedPipe.exists()) { + Files.copy(namedPipe.toPath, NullOutputStreamShim.INSTANCE) + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index ac7228b7045..4886603dcad 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -269,6 +269,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging { s"Rpc message $msg received, but shuffle heartbeat manager not configured.") } rapidsShuffleHeartbeatManager.executorHeartbeat(id) + case m: GpuCoreDumpMsg => GpuCoreDumpHandler.handleMsg(m) case m => throw new IllegalStateException(s"Unknown message $m") } } @@ -279,6 +280,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging { RapidsPluginUtils.fixupConfigsOnDriver(sparkConf) val conf = new RapidsConf(sparkConf) RapidsPluginUtils.logPluginMode(conf) + GpuCoreDumpHandler.driverInit(sc, conf) if (GpuShuffleEnv.isRapidsShuffleAvailable(conf)) { GpuShuffleEnv.initShuffleManager() @@ -351,6 +353,8 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } } + GpuCoreDumpHandler.executorInit(conf, pluginContext) + // we rely on the Rapids Plugin being run with 1 GPU per executor so we can initialize // on executor startup. if (!GpuDeviceManager.rmmTaskInitEnabled) { @@ -475,6 +479,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { Option(rapidsShuffleHeartbeatEndpoint).foreach(_.close()) extraExecutorPlugins.foreach(_.shutdown()) FileCache.shutdown() + GpuCoreDumpHandler.shutdown() } override def onTaskFailed(failureReason: TaskFailedReason): Unit = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 9fe3793cffe..7022e85635b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -411,6 +411,41 @@ object RapidsConf { .integerConf .createWithDefault(2) + val GPU_COREDUMP_DIR = conf("spark.rapids.gpu.coreDump.dir") + .doc("The URI to a directory where a GPU core dump will be created if the GPU encounters " + + "an exception. The filename will be of the form gpucore--.nvcudmp, " + + "where is the Spark application ID and is the executor ID.") + .internal() + .stringConf + .createOptional + + val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") + .doc("The pattern to use to generate the named pipe path. Occurrences of %p in the pattern " + + "will be replaced with the process ID of the executor.") + .internal + .stringConf + .createWithDefault("gpucorepipe.%p") + + val GPU_COREDUMP_FULL = conf("spark.rapids.gpu.coreDump.full") + .doc("If true, GPU coredumps will be a full coredump (i.e.: with local, shared, and global " + + "memory).") + .internal() + .booleanConf + .createWithDefault(false) + + val GPU_COREDUMP_COMPRESSION_CODEC = conf("spark.rapids.gpu.coreDump.compression.codec") + .doc("The codec used to compress GPU core dumps. Spark provides the codecs " + + "lz4, lzf, snappy, and zstd.") + .stringConf + .createWithDefault("zstd") + + val GPU_COREDUMP_COMPRESS = conf("spark.rapids.gpu.coreDump.compress") + .doc("If true, GPU coredumps will be compressed using the compression codec specified " + + s"in $GPU_COREDUMP_COMPRESSION_CODEC") + .internal() + .booleanConf + .createWithDefault(true) + private val RMM_ALLOC_MAX_FRACTION_KEY = "spark.rapids.memory.gpu.maxAllocFraction" private val RMM_ALLOC_MIN_FRACTION_KEY = "spark.rapids.memory.gpu.minAllocFraction" private val RMM_ALLOC_RESERVE_KEY = "spark.rapids.memory.gpu.reserve" @@ -2241,6 +2276,16 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val gpuOomMaxRetries: Int = get(GPU_OOM_MAX_RETRIES) + lazy val gpuCoreDumpDir: Option[String] = get(GPU_COREDUMP_DIR) + + lazy val gpuCoreDumpPipePattern: String = get(GPU_COREDUMP_PIPE_PATTERN) + + lazy val isGpuCoreDumpFull: Boolean = get(GPU_COREDUMP_FULL) + + lazy val isGpuCoreDumpCompressed: Boolean = get(GPU_COREDUMP_COMPRESS) + + lazy val gpuCoreDumpCompressionCodec: String = get(GPU_COREDUMP_COMPRESSION_CODEC) + lazy val isUvmEnabled: Boolean = get(UVM_ENABLED) lazy val isPooledMemEnabled: Boolean = get(POOLED_MEM) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index c5665ffb7bf..9a732184f55 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.rapids.execution import org.json4s.JsonAST - import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkUpgradeException, TaskContext} + import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.config.EXECUTOR_ID +import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute @@ -173,4 +174,14 @@ object TrampolineUtil { def getSparkConf(spark: SparkSession): SQLConf = { spark.sqlContext.conf } + + def setExecutorEnv(sc: SparkContext, key: String, value: String): Unit = { + sc.executorEnvs(key) = value + } + + def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { + CompressionCodec.createCodec(conf, codecName) + } + + def getCodecShortName(codecName: String): String = CompressionCodec.getShortName(codecName) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala new file mode 100644 index 00000000000..c63c94bedbb --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.io.{BufferedOutputStream, File, FileInputStream, FileOutputStream, InputStream} +import java.util.concurrent.TimeUnit + +import com.nvidia.spark.rapids.Arm.withResource +import org.apache.hadoop.conf.Configuration +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{timeout, verify, when} +import org.scalatest.funsuite.AnyFunSuite +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.SparkConf +import org.apache.spark.api.plugin.PluginContext +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.util.SerializableConfiguration + +class GpuCoreDumpHandlerSuite extends AnyFunSuite { + private val WAIT_MSECS = TimeUnit.SECONDS.toMillis(10) + private val APP_ID = "app-1234-567" + private val EXECUTOR_ID = "0" + private val FAKE_DUMP_SIZE = 1024*1024L + private val FAKE_DUMP_BYTES = "abc123".getBytes + private var pipeNum = 0 + + test("test no dump handler by default") { + val sparkConf = new SparkConf(false) + val mockCtx = buildMockCtx(sparkConf) + val rapidsConf = new RapidsConf(sparkConf) + GpuCoreDumpHandler.executorInit(rapidsConf, mockCtx) + try { + assertResult(null)(GpuCoreDumpHandler.getNamedPipeFile) + } finally { + GpuCoreDumpHandler.shutdown() + } + } + + test("test dump handler enabled no dump") { + val sparkConf = buildSparkConf() + .set(RapidsConf.GPU_COREDUMP_DIR.key, "/tmp") + val mockCtx = buildMockCtx(sparkConf) + val rapidsConf = new RapidsConf(sparkConf) + GpuCoreDumpHandler.executorInit(rapidsConf, mockCtx) + val namedPipeFile = GpuCoreDumpHandler.getNamedPipeFile + try { + assert(waitForIt(WAIT_MSECS)(() => namedPipeFile.exists())) + } finally { + GpuCoreDumpHandler.shutdown() + } + assert(waitForIt(WAIT_MSECS)(() => !namedPipeFile.exists())) + } + + test("test dump handler enabled dump") { + val sparkConf = buildSparkConf() + .set(RapidsConf.GPU_COREDUMP_DIR.key, "/tmp") + val mockCtx = buildMockCtx(sparkConf) + val rapidsConf = new RapidsConf(sparkConf) + GpuCoreDumpHandler.executorInit(rapidsConf, mockCtx) + val namedPipeFile = GpuCoreDumpHandler.getNamedPipeFile + try { + assert(waitForIt(WAIT_MSECS)(() => namedPipeFile.exists())) + fakeDump(namedPipeFile) + verify(mockCtx, timeout(WAIT_MSECS).times(1)).ask(any[GpuCoreDumpMsgStart]()) + verify(mockCtx, timeout(WAIT_MSECS).times(1)).send(any[GpuCoreDumpMsgCompleted]()) + val dumpFile = new File(s"/tmp/gpucore-$APP_ID-$EXECUTOR_ID.nvcudmp.zstd") + assert(dumpFile.exists()) + try { + val codec = TrampolineUtil.createCodec(sparkConf, "zstd") + verifyDump(codec.compressedInputStream(new FileInputStream(dumpFile))) + } finally { + dumpFile.delete() + } + } finally { + GpuCoreDumpHandler.shutdown() + } + assert(waitForIt(WAIT_MSECS)(() => !namedPipeFile.exists())) + } + + test("test dump handler enabled dump no codec") { + val sparkConf = buildSparkConf() + .set(RapidsConf.GPU_COREDUMP_DIR.key, "file:/tmp") + .set(RapidsConf.GPU_COREDUMP_COMPRESS.key, "false") + val mockCtx = buildMockCtx(sparkConf) + val rapidsConf = new RapidsConf(sparkConf) + GpuCoreDumpHandler.executorInit(rapidsConf, mockCtx) + val namedPipeFile = GpuCoreDumpHandler.getNamedPipeFile + try { + assert(waitForIt(WAIT_MSECS)(() => namedPipeFile.exists())) + fakeDump(namedPipeFile) + verify(mockCtx, timeout(WAIT_MSECS).times(1)).ask(any[GpuCoreDumpMsgStart]()) + verify(mockCtx, timeout(WAIT_MSECS).times(1)).send(any[GpuCoreDumpMsgCompleted]()) + val dumpFile = new File(s"/tmp/gpucore-$APP_ID-$EXECUTOR_ID.nvcudmp") + assert(dumpFile.exists()) + try { + verifyDump(new FileInputStream(dumpFile)) + } finally { + dumpFile.delete() + } + } finally { + GpuCoreDumpHandler.shutdown() + } + assert(waitForIt(WAIT_MSECS)(() => !namedPipeFile.exists())) + } + + private def buildMockCtx(conf: SparkConf): PluginContext = { + val ctxConf = conf.clone() + ctxConf.set("spark.app.id", APP_ID) + pipeNum += 1 + val mockCtx = mock[PluginContext] + when(mockCtx.conf).thenReturn(ctxConf) + when(mockCtx.executorID()).thenReturn(EXECUTOR_ID) + when(mockCtx.ask(any[GpuCoreDumpMsgStart]())).thenAnswer(_ => + new SerializableConfiguration(new Configuration(false))) + mockCtx + } + + private def waitForIt(millis: Long)(block: () => Boolean): Boolean = { + val end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millis) + while (!block() && System.nanoTime() < end) { + Thread.sleep(10) + } + block() + } + + private def fakeDump(pipeFile: File): Unit = { + val out = new BufferedOutputStream(new FileOutputStream(pipeFile)) + withResource(out) { _ => + var bytesWritten = 0L + while (bytesWritten < FAKE_DUMP_SIZE) { + out.write(FAKE_DUMP_BYTES) + bytesWritten += FAKE_DUMP_BYTES.length + } + } + } + + private def verifyDump(in: InputStream): Unit = { + withResource(in) { _ => + val dumpBytesStr = new String(FAKE_DUMP_BYTES) + var numBytes = 0L + val data = new Array[Byte](FAKE_DUMP_BYTES.length) + while (numBytes < FAKE_DUMP_SIZE) { + var offset = 0 + while (offset < data.length) { + val bytesRead = in.read(data, offset, data.length - offset) + assert(bytesRead > 0) + numBytes += bytesRead + offset += bytesRead + } + assertResult(dumpBytesStr)(new String(data)) + } + assertResult(-1)(in.read()) + } + } + + private def buildSparkConf(): SparkConf = { + // Use a different named pipe for each test, as background threads can linger on old pipes + // and conflict with other coredump tests. + new SparkConf(false) + .set(RapidsConf.GPU_COREDUMP_PIPE_PATTERN.key, "gpucore.test" + pipeNum) + } +} From f3005e4f0eb5a66f1982d197d9c538b68a34c748 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 13 Sep 2023 16:49:49 -0500 Subject: [PATCH 2/8] scalastyle fix --- .../org/apache/spark/sql/rapids/execution/TrampolineUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index 9a732184f55..b59be56a75f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.rapids.execution import org.json4s.JsonAST -import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkUpgradeException, TaskContext} +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkUpgradeException, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.InputMetrics From 6d3848e2fd1915a21bf81651bafc0a88688be3f5 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 13 Sep 2023 17:17:18 -0500 Subject: [PATCH 3/8] Fix config visibility --- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 7022e85635b..1a2dc1e36d1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -436,6 +436,7 @@ object RapidsConf { val GPU_COREDUMP_COMPRESSION_CODEC = conf("spark.rapids.gpu.coreDump.compression.codec") .doc("The codec used to compress GPU core dumps. Spark provides the codecs " + "lz4, lzf, snappy, and zstd.") + .internal() .stringConf .createWithDefault("zstd") From 320fe1d08816205849840df5f350d9961ee7ff84 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 2 Oct 2023 14:54:05 -0500 Subject: [PATCH 4/8] Wait for in-progress GPU core dumps when shutting down due to fatal error --- .../spark/rapids/GpuCoreDumpHandler.scala | 17 +++++++++++++++++ .../scala/com/nvidia/spark/rapids/Plugin.scala | 1 + 2 files changed, 18 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala index f46e4f9ca81..13ffd5ee203 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala @@ -42,6 +42,7 @@ object GpuCoreDumpHandler extends Logging { private var executor: Option[ExecutorService] = None private var dumpedPath: Option[String] = None private var namedPipeFile: File = _ + private var isDumping: Boolean = false /** * Configures the executor launch environment for GPU core dumps, if applicable. @@ -93,6 +94,19 @@ object GpuCoreDumpHandler extends Logging { } } + /** + * Wait for a GPU dump in progress, if any, to complete + * @param timeoutSecs maximum amount of time to wait before returning + * @return true if the wait timedout, false otherwise + */ + def waitForDump(timeoutSecs: Int): Boolean = { + val endTime = System.nanoTime + TimeUnit.SECONDS.toNanos(timeoutSecs) + while (isDumping && System.nanoTime < endTime) { + Thread.sleep(10) + } + System.nanoTime < endTime + } + def shutdown(): Unit = { executor.foreach { exec => exec.shutdownNow() @@ -137,6 +151,7 @@ object GpuCoreDumpHandler extends Logging { try { logInfo(s"Monitoring ${namedPipe.getAbsolutePath} for GPU core dumps") withResource(new java.io.FileInputStream(namedPipe)) { in => + isDumping = true val appId = pluginCtx.conf.get("spark.app.id") val executorId = pluginCtx.executorID() val dumpPath = new Path(dumpDirPath, @@ -161,6 +176,8 @@ object GpuCoreDumpHandler extends Logging { } catch { case e: Exception => logError("Error copying GPU dump", e) + } finally { + isDumping = false } // Always drain the pipe to avoid blocking the thread that triggers the coredump while (namedPipe.exists()) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 4886603dcad..d10b8336fdd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -492,6 +492,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { case Some(e) if containsCudaFatalException(e) => logError("Stopping the Executor based on exception being a fatal CUDA error: " + s"${ef.toErrorString}") + GpuCoreDumpHandler.waitForDump(timeoutSecs = 60) logGpuDebugInfoAndExit(systemExitCode = 20) case Some(_: CudaException) => logDebug(s"Executor onTaskFailed because of a non-fatal CUDA error: " + From 6d911e97f5047e4795e2d0c8b284f74c86f8a246 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 3 Oct 2023 09:22:04 -0500 Subject: [PATCH 5/8] Move dump messages to API module Signed-off-by: Jason Lowe --- dist/unshimmed-common-from-spark311.txt | 3 +-- .../nvidia/spark/rapids/GpuCoreDumpMsg.scala | 24 +++++++++++++++++++ .../spark/rapids/GpuCoreDumpHandler.scala | 4 ---- 3 files changed, 25 insertions(+), 6 deletions(-) create mode 100644 sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala diff --git a/dist/unshimmed-common-from-spark311.txt b/dist/unshimmed-common-from-spark311.txt index 770b19fc1f5..cf67a19590a 100644 --- a/dist/unshimmed-common-from-spark311.txt +++ b/dist/unshimmed-common-from-spark311.txt @@ -8,11 +8,10 @@ com/nvidia/spark/RapidsUDF* com/nvidia/spark/Retryable* com/nvidia/spark/SQLPlugin* com/nvidia/spark/rapids/ColumnarRdd* +com/nvidia/spark/rapids/GpuColumnVectorUtils* com/nvidia/spark/rapids/ExplainPlan.class com/nvidia/spark/rapids/ExplainPlan$.class com/nvidia/spark/rapids/ExplainPlanBase.class -com/nvidia/spark/rapids/GpuColumnVectorUtils* -com/nvidia/spark/rapids/GpuCoreDumpMsg* com/nvidia/spark/rapids/GpuKryoRegistrator* com/nvidia/spark/rapids/PlanUtils* com/nvidia/spark/rapids/RapidsExecutorHeartbeatMsg* diff --git a/sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala b/sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala new file mode 100644 index 00000000000..8b325bb586a --- /dev/null +++ b/sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids + +trait GpuCoreDumpMsg + +/** Serialized message sent from executor to driver when a GPU core dump starts */ +case class GpuCoreDumpMsgStart(executorId: String, dumpPath: String) extends GpuCoreDumpMsg + +/** Serialized message sent from executor to driver when a GPU core dump completes */ +case class GpuCoreDumpMsgCompleted(executorId: String, dumpPath: String) extends GpuCoreDumpMsg diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala index 13ffd5ee203..9962305008d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala @@ -34,10 +34,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.util.SerializableConfiguration -trait GpuCoreDumpMsg -case class GpuCoreDumpMsgStart(executorId: String, dumpPath: String) extends GpuCoreDumpMsg -case class GpuCoreDumpMsgCompleted(executorId: String, dumpPath: String) extends GpuCoreDumpMsg - object GpuCoreDumpHandler extends Logging { private var executor: Option[ExecutorService] = None private var dumpedPath: Option[String] = None From b1d50027be331669042a3abe567c84298aa07a79 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 3 Oct 2023 10:23:51 -0500 Subject: [PATCH 6/8] Add dev documentation for GPU core dumps --- docs/dev/gpu-core-dumps.md | 88 +++++++++++++++++++ .../com/nvidia/spark/rapids/RapidsConf.scala | 5 +- 2 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 docs/dev/gpu-core-dumps.md diff --git a/docs/dev/gpu-core-dumps.md b/docs/dev/gpu-core-dumps.md new file mode 100644 index 00000000000..522c8c621ae --- /dev/null +++ b/docs/dev/gpu-core-dumps.md @@ -0,0 +1,88 @@ +--- +layout: page +title: GPU Core Dumps +nav_order: 9 +parent: Developer Overview +--- +# GPU Core Dumps + +## Overview + +When the GPU segfaults and generates an illegal access exception, it can be difficult to know +what the GPU was doing at the time of the exception. GPU operations execute asynchronously, so what +the CPU was doing at the time the GPU exception was noticed often has little to do with what +triggered the exception. GPU core dumps can provide useful clues when debugging these errors, as +they contain the state of the GPU at the time the exception occurred on the GPU. + +The GPU driver can be configured to write a GPU core dump when the GPU segfaults via environment +variable settings for the process. The challenges for the RAPIDS Accelerator use case are getting +the environment variables set on the executor processes and then copying the GPU core dump file +to a distributed filesystem after it is generated on the local filesystem by the driver. + +## Environment Variables + +The following environment variables are useful for controlling GPU core dumps. See the +[GPU core dump support section of the CUDA-GDB documentation](https://docs.nvidia.com/cuda/cuda-gdb/index.html#gpu-core-dump-support) +for more details. + +### `CUDA_ENABLE_COREDUMP_ON_EXCEPTION` + +Set to `1` to trigger a GPU core dump on a GPU exception. + +### `CUDA_COREDUMP_FILE` + +The filename to use for the GPU core dump file. Relative paths to the process current working +directory are supported. The pattern `%h` in the filename will be expanded to the hostname, and +the pattern `%p` will be expanded to the process ID. If the filename corresponds with a named pipe, +the GPU core dump data will be written to the named pipe by the GPU driver. + +### `CUDA_ENABLE_LIGHTWEIGHT_COREDUMP` + +Set to `1` to generate a lightweight core dump that omits the local, shared, and global memory +dumps. Disabled by default. Lightweight core dumps still show the code location that triggered +the exception and therefore can be a good option when one only needs to know what kernel(s) were +running at the time of the exception and which one triggered the exception. + +### `CUDA_ENABLE_CPU_COREDUMP_ON_EXCEPTION` + +Set to `0` to prevent the GPU driver from causing a CPU core dump of the process after the GPU +core dump is written. Enabled by default. + +### `CUDA_COREDUMP_SHOW_PROGRESS` + +Set to `1` to print progress messages to the process stderr as the GPU core dump is generated. This +is only supported on newer GPU drivers (e.g.: those that are CUDA 12 compatible). + +## YARN Log Aggregation + +The log aggregation feature of YARN can be leveraged to copy GPU core dumps to the same place that +YARN collects container logs. When enabled, YARN will collect all files in a container's log +directory to a distributed filesystem location. YARN will automatically expand the pattern +`` in a container's environment variables to the container's log directory which is useful +when configuring `CUDA_COREDUMP_FILE` to place the GPU core dump in the appropriate place for +log aggregation. Note that YARN log aggregation may be configured to have relatively low file size +limits which may interfere with successful collection of large GPU core dump files. + +The following Spark configuration settings will enable GPU lightweight core dumps and have the +core dump files placed in the container log directory: +```text +spark.executorEnv.CUDA_ENABLE_COREDUMP_ON_EXCEPTION=1 +spark.executorEnv.CUDA_ENABLE_LIGHTWEIGHT_COREDUMP=1 +spark.executorEnv.CUDA_COREDUMP_FILE="/executor-%h-%p.nvcudmp" +``` + +## Simplified Core Dump Handling + +There is rudimentary support for simplified setup of GPU core dumps in the RAPIDS Accelerator. +This currently only works on Spark standalone clusters, since there is currently no way for a driver +plugin to programmatically override executor environment variable settings for Spark-on-YARN or +Spark-on-Kubernetes. In the future with a driver that is compatible with CUDA 12.1 or later, +the RAPIDS Accelerator could leverage GPU driver APIs to programmatically configure GPU core dump +support on executor startup. + +To enable the simplified core dump handling, set `spark.rapids.gpu.coreDump.dir` to a directory to +use for GPU core dumps. Distributed filesystem URIs are supported. This leverages named pipes and +background threads to copy the GPU core dump data to the distributed filesystem. Note that anything +that causes early, abrupt termination of the process such as throwing from a C++ destructor will +often terminate the process before the dump write can be completed. These abrupt terminations should +be fixed when discovered. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index e80b53b5541..e0294dd09ce 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -413,8 +413,9 @@ object RapidsConf { val GPU_COREDUMP_DIR = conf("spark.rapids.gpu.coreDump.dir") .doc("The URI to a directory where a GPU core dump will be created if the GPU encounters " + - "an exception. The filename will be of the form gpucore--.nvcudmp, " + - "where is the Spark application ID and is the executor ID.") + "an exception. The URI can reference a distributed filesystem. The filename will be of the " + + "form gpucore--.nvcudmp, where is the Spark application ID and " + + " is the executor ID.") .internal() .stringConf .createOptional From 6eebeacd4d5de8fe11f363771a9259e06bcc5f65 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 3 Oct 2023 10:29:55 -0500 Subject: [PATCH 7/8] Add TODO for leveraging CUDA 12.1 core dump APIs --- docs/dev/gpu-core-dumps.md | 1 + .../scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/docs/dev/gpu-core-dumps.md b/docs/dev/gpu-core-dumps.md index 522c8c621ae..4617d49abcc 100644 --- a/docs/dev/gpu-core-dumps.md +++ b/docs/dev/gpu-core-dumps.md @@ -65,6 +65,7 @@ limits which may interfere with successful collection of large GPU core dump fil The following Spark configuration settings will enable GPU lightweight core dumps and have the core dump files placed in the container log directory: + ```text spark.executorEnv.CUDA_ENABLE_COREDUMP_ON_EXCEPTION=1 spark.executorEnv.CUDA_ENABLE_LIGHTWEIGHT_COREDUMP=1 diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala index 9962305008d..a3a32424a41 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala @@ -45,6 +45,10 @@ object GpuCoreDumpHandler extends Logging { * Should only be called from the driver on driver startup. */ def driverInit(sc: SparkContext, conf: RapidsConf): Unit = { + // This only works in practice on Spark standalone clusters. It's too late to influence the + // executor environment for Spark-on-YARN or Spark-on-k8s. + // TODO: Leverage CUDA 12.1 core dump APIs in the executor to programmatically set this up + // on executor startup. https://github.com/NVIDIA/spark-rapids/issues/9370 conf.gpuCoreDumpDir.foreach { _ => TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_COREDUMP_ON_EXCEPTION", "1") TrampolineUtil.setExecutorEnv(sc, "CUDA_ENABLE_CPU_COREDUMP_ON_EXCEPTION", "0") From b4aeffd5f2bc16c9e73c273eea35d914560b76d7 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 3 Oct 2023 14:17:00 -0500 Subject: [PATCH 8/8] Add GPU core dump failure log on driver --- .../nvidia/spark/rapids/GpuCoreDumpMsg.scala | 3 +++ .../spark/rapids/GpuCoreDumpHandler.scala | 11 ++++++-- .../rapids/GpuCoreDumpHandlerSuite.scala | 26 ++++++++++++++++++- 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala b/sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala index 8b325bb586a..0ffb2b223a2 100644 --- a/sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala +++ b/sql-plugin-api/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpMsg.scala @@ -22,3 +22,6 @@ case class GpuCoreDumpMsgStart(executorId: String, dumpPath: String) extends Gpu /** Serialized message sent from executor to driver when a GPU core dump completes */ case class GpuCoreDumpMsgCompleted(executorId: String, dumpPath: String) extends GpuCoreDumpMsg + +/** Serialized message sent from executor to driver when a GPU core dump fails */ +case class GpuCoreDumpMsgFailed(executorId: String, error: String) extends GpuCoreDumpMsg diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala index a3a32424a41..77c9a9987e8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala @@ -15,7 +15,7 @@ */ package com.nvidia.spark.rapids -import java.io.File +import java.io.{File, PrintWriter} import java.lang.management.ManagementFactory import java.nio.file.Files import java.util.concurrent.{Executors, ExecutorService, TimeUnit} @@ -23,6 +23,7 @@ import java.util.concurrent.{Executors, ExecutorService, TimeUnit} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.shims.NullOutputStreamShim import org.apache.commons.io.IOUtils +import org.apache.commons.io.output.StringBuilderWriter import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.{FsAction, FsPermission} @@ -124,6 +125,9 @@ object GpuCoreDumpHandler extends Logging { case GpuCoreDumpMsgCompleted(executorId, dumpPath) => logError(s"Executor $executorId wrote a GPU core dump to $dumpPath") null + case GpuCoreDumpMsgFailed(executorId, error) => + logError(s"Executor $executorId failed to write a GPU core dump: $error") + null case m => throw new IllegalStateException(s"Unexpected GPU core dump msg: $m") } @@ -148,12 +152,12 @@ object GpuCoreDumpHandler extends Logging { dumpDirPath: Path, codec: Option[CompressionCodec], suffix: String): Unit = { + val executorId = pluginCtx.executorID() try { logInfo(s"Monitoring ${namedPipe.getAbsolutePath} for GPU core dumps") withResource(new java.io.FileInputStream(namedPipe)) { in => isDumping = true val appId = pluginCtx.conf.get("spark.app.id") - val executorId = pluginCtx.executorID() val dumpPath = new Path(dumpDirPath, s"gpucore-$appId-$executorId.nvcudmp$suffix") logError(s"Generating GPU core dump at $dumpPath") @@ -176,6 +180,9 @@ object GpuCoreDumpHandler extends Logging { } catch { case e: Exception => logError("Error copying GPU dump", e) + val writer = new StringBuilderWriter() + e.printStackTrace(new PrintWriter(writer)) + pluginCtx.send(GpuCoreDumpMsgFailed(executorId, s"$e\n${writer.toString}")) } finally { isDumping = false } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala index c63c94bedbb..da479694379 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoreDumpHandlerSuite.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import java.io.{BufferedOutputStream, File, FileInputStream, FileOutputStream, InputStream} +import java.io.{BufferedOutputStream, File, FileInputStream, FileOutputStream, InputStream, IOException} import java.util.concurrent.TimeUnit import com.nvidia.spark.rapids.Arm.withResource @@ -118,6 +118,30 @@ class GpuCoreDumpHandlerSuite extends AnyFunSuite { assert(waitForIt(WAIT_MSECS)(() => !namedPipeFile.exists())) } + test("test dump handler failure") { + val sparkConf = buildSparkConf() + .set(RapidsConf.GPU_COREDUMP_DIR.key, "thiswillfail://foo/bar") + .set(RapidsConf.GPU_COREDUMP_COMPRESS.key, "false") + val mockCtx = buildMockCtx(sparkConf) + val rapidsConf = new RapidsConf(sparkConf) + GpuCoreDumpHandler.executorInit(rapidsConf, mockCtx) + val namedPipeFile = GpuCoreDumpHandler.getNamedPipeFile + try { + assert(waitForIt(WAIT_MSECS)(() => namedPipeFile.exists())) + try { + fakeDump(namedPipeFile) + } catch { + case e: IOException if e.getMessage.contains("Broken pipe") => + // broken pipe is expected when reader crashes + } + verify(mockCtx, timeout(WAIT_MSECS).times(1)).ask(any[GpuCoreDumpMsgStart]()) + verify(mockCtx, timeout(WAIT_MSECS).times(1)).send(any[GpuCoreDumpMsgFailed]()) + } finally { + GpuCoreDumpHandler.shutdown() + } + assert(waitForIt(WAIT_MSECS)(() => !namedPipeFile.exists())) + } + private def buildMockCtx(conf: SparkConf): PluginContext = { val ctxConf = conf.clone() ctxConf.set("spark.app.id", APP_ID)