Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dumping write data to try and reproduce error cases #11864

Merged
merged 7 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -219,8 +219,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -219,8 +219,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -241,8 +241,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -241,8 +241,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -243,8 +243,9 @@ class GpuOptimisticTransaction
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -261,8 +261,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -258,8 +258,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
Expand Down Expand Up @@ -259,8 +259,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ class GpuOptimisticTransaction(
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers,
options = options,
rapidsConf.stableSort,
rapidsConf.concurrentWriterPartitionFlushSize)
useStableSort = rapidsConf.stableSort,
concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize,
baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.nvidia.spark.rapids

import java.io.OutputStream
import java.io.{BufferedOutputStream, DataOutputStream, OutputStream}

import scala.collection.mutable

import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, TableWriter}
import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange, TableWriter}
import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -61,7 +61,8 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter
}

/**
Expand All @@ -73,9 +74,43 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType,
rangeName: String,
includeRetry: Boolean,
debugDumpPath: Option[String],
holdGpuBetweenBatches: Boolean = false) extends HostBufferConsumer with Logging {

protected val tableWriter: TableWriter
private lazy val debugDumpOutputStream: Option[OutputStream] = try {
debugDumpPath.map { path =>
val tc = TaskContext.get()
logWarning(s"DEBUG FILE OUTPUT $rangeName FOR " +
s"STAGE ${tc.stageId()} TASK ${tc.taskAttemptId()} is $path")
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(conf)
new DataOutputStream(new BufferedOutputStream(fs.create(hadoopPath, false)))
}
} catch {
case e: Exception =>
logError(s"Could Not Write Debug Table $debugDumpPath", e)
None
}

/**
* Write out a debug batch to the debug output stream if it is configured.
* If it is not configured, this is a noop. If an exception happens the exception
* is ignored, but it is logged.
*/
private def debugWriteBatch(batch: ColumnarBatch): Unit = {
debugDumpOutputStream.foreach { output =>
try {
withResource(GpuColumnVector.from(batch)) { table =>
JCudfSerialization.writeToStream(table, output, 0, table.getRowCount)
}
output.flush()
} catch {
case t: Throwable =>
logError(s"Could Not Write Debug Table $debugDumpPath", t)
}
}
}

protected val conf: Configuration = context.getConfiguration

Expand Down Expand Up @@ -219,6 +254,7 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
// where corrupt files can be written if nothing is encoded via the writer.
anythingWritten = true

debugWriteBatch(batch)
// tableWriter.write() serializes the table into the HostMemoryBuffer, and buffers it
// by calling handleBuffer() on the ColumnarOutputWriter. It may not write to the
// output stream just yet.
Expand All @@ -239,6 +275,9 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
GpuSemaphore.releaseIfNecessary(TaskContext.get())
writeBufferedData()
outputStream.close()
debugDumpOutputStream.foreach { os =>
os.close()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ final class InsertIntoHadoopFsRelationCommandMeta(
cmd.fileIndex,
cmd.outputColumnNames,
conf.stableSort,
conf.concurrentWriterPartitionFlushSize)
conf.concurrentWriterPartitionFlushSize,
conf.outputDebugDumpPrefix)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,11 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter = {
new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString,
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled,
holdGpuBetweenBatches)
holdGpuBetweenBatches, debugOutputPath)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -306,8 +307,10 @@ class GpuParquetWriter(
timestampRebaseMode: DateTimeRebaseMode,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean,
holdGpuBetweenBatches: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, holdGpuBetweenBatches) {
holdGpuBetweenBatches: Boolean,
debugDumpPath: Option[String])
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true,
debugDumpPath, holdGpuBetweenBatches) {
override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = {
val cols = GpuColumnVector.extractBases(batch)
cols.foreach { col =>
Expand Down
11 changes: 11 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,15 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(false)

val OUTPUT_DEBUG_DUMP_PREFIX = conf("spark.rapids.sql.output.debug.dumpPrefix")
.doc("A path prefix where data that is intended to be written out as the result " +
"of a query should be dumped for debugging. The format of this is based on " +
"JCudfSerialization and is trying to capture the underlying table so that if " +
"there are errors in the output format we can try to recreate it.")
.internal()
.stringConf
.createOptional

val PARQUET_DEBUG_DUMP_PREFIX = conf("spark.rapids.sql.parquet.debug.dumpPrefix")
.doc("A path prefix where Parquet split file data is dumped for debugging.")
.internal()
Expand Down Expand Up @@ -2883,6 +2892,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val maxGpuColumnSizeBytes: Long = get(MAX_GPU_COLUMN_SIZE_BYTES)

lazy val outputDebugDumpPrefix: Option[String] = get(OUTPUT_DEBUG_DUMP_PREFIX)

lazy val parquetDebugDumpPrefix: Option[String] = get(PARQUET_DEBUG_DUMP_PREFIX)

lazy val parquetDebugDumpAlways: Boolean = get(PARQUET_DEBUG_DUMP_ALWAYS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,18 @@ class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFo

override def newInstance(path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuHiveParquetWriter(path, dataSchema, context, compressionType)
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter = {
new GpuHiveParquetWriter(path, dataSchema, context, compressionType, debugOutputPath)
}
}
}
}

class GpuHiveParquetWriter(override val path: String, dataSchema: StructType,
context: TaskAttemptContext, compType: CompressionType)
extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true) {
context: TaskAttemptContext, compType: CompressionType,
debugOutputPath: Option[String])
extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true, debugOutputPath) {

override protected val tableWriter: CudfTableWriter = {
val optionsBuilder = SchemaUtils
Expand All @@ -217,17 +219,19 @@ class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging {

override def newInstance(path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuHiveTextWriter(path, dataSchema, context)
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter = {
new GpuHiveTextWriter(path, dataSchema, context, debugOutputPath)
}
}
}
}

class GpuHiveTextWriter(override val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends ColumnarOutputWriter(context, dataSchema, "HiveText", false) {
context: TaskAttemptContext,
debugOutputPath: Option[String])
extends ColumnarOutputWriter(context, dataSchema, "HiveText", false, debugOutputPath) {

/**
* This reformats columns, to iron out inconsistencies between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH
fileFormat: ColumnarFileFormat,
outputLocation: String,
forceHiveHashForBucketing: Boolean,
baseDebugOutputPath: Option[String],
customPartitionLocations: Map[TablePartitionSpec,String] = Map.empty,
partitionAttributes: Seq[Attribute] = Nil,
bucketSpec: Option[BucketSpec] = None,
Expand All @@ -67,7 +68,8 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH
options = options,
useStableSort = false, // TODO: Fetch from RapidsConf.
forceHiveHashForBucketing = forceHiveHashForBucketing,
concurrentWriterPartitionFlushSize = 0L // TODO: Fetch from RapidsConf.
concurrentWriterPartitionFlushSize = 0L, // TODO: Fetch from RapidsConf.
baseDebugOutputPath = baseDebugOutputPath
)
}
}
Loading
Loading