From 0e27736b72ad0012d1d4a7639040b0278bfdedcd Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 11 Dec 2024 15:30:44 -0600 Subject: [PATCH 1/4] Add support for dumping write data to try and reproduce error cases Signed-off-by: Robert (Bobby) Evans --- .../delta20x/GpuOptimisticTransaction.scala | 7 +-- .../delta21x/GpuOptimisticTransaction.scala | 7 +-- .../delta22x/GpuOptimisticTransaction.scala | 7 +-- .../delta23x/GpuOptimisticTransaction.scala | 7 +-- .../delta24x/GpuOptimisticTransaction.scala | 7 +-- .../rapids/GpuOptimisticTransaction.scala | 7 +-- .../rapids/GpuOptimisticTransaction.scala | 5 ++- .../rapids/GpuOptimisticTransaction.scala | 7 +-- .../rapids/GpuOptimisticTransaction.scala | 5 ++- .../spark/rapids/ColumnarOutputWriter.scala | 45 +++++++++++++++++-- .../nvidia/spark/rapids/GpuOverrides.scala | 3 +- .../spark/rapids/GpuParquetFileFormat.scala | 11 +++-- .../com/nvidia/spark/rapids/RapidsConf.scala | 11 +++++ .../sql/hive/rapids/GpuHiveFileFormat.scala | 20 +++++---- .../sql/hive/rapids/GpuSaveAsHiveFile.scala | 4 +- .../sql/rapids/GpuFileFormatDataWriter.scala | 34 +++++++++++--- ...GpuInsertIntoHadoopFsRelationCommand.scala | 6 ++- .../spark/sql/rapids/GpuOrcFileFormat.scala | 10 +++-- ...aSourceTableAsSelectCommandMetaShims.scala | 3 +- .../rapids/shims/GpuInsertIntoHiveTable.scala | 11 +++-- .../spark/sql/rapids/GpuDataSource.scala | 11 +++-- .../sql/rapids/GpuFileFormatWriter.scala | 17 ++++--- ...eDataSourceTableAsSelectCommandShims.scala | 5 ++- .../rapids/shims/GpuInsertIntoHiveTable.scala | 10 +++-- .../execution/datasources/GpuWriteFiles.scala | 9 ++-- .../sql/rapids/GpuFileFormatWriter.scala | 23 ++++++---- .../rapids/GpuFileFormatDataWriterSuite.scala | 17 ++++--- 27 files changed, 216 insertions(+), 93 deletions(-) diff --git a/delta-lake/delta-20x/src/main/scala/org/apache/spark/sql/delta/rapids/delta20x/GpuOptimisticTransaction.scala b/delta-lake/delta-20x/src/main/scala/org/apache/spark/sql/delta/rapids/delta20x/GpuOptimisticTransaction.scala index 282368fc95b..ee0fc6f1cae 100644 --- a/delta-lake/delta-20x/src/main/scala/org/apache/spark/sql/delta/rapids/delta20x/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-20x/src/main/scala/org/apache/spark/sql/delta/rapids/delta20x/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -219,8 +219,9 @@ class GpuOptimisticTransaction bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-21x/src/main/scala/org/apache/spark/sql/delta/rapids/delta21x/GpuOptimisticTransaction.scala b/delta-lake/delta-21x/src/main/scala/org/apache/spark/sql/delta/rapids/delta21x/GpuOptimisticTransaction.scala index 9acded983d0..7433594aa73 100644 --- a/delta-lake/delta-21x/src/main/scala/org/apache/spark/sql/delta/rapids/delta21x/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-21x/src/main/scala/org/apache/spark/sql/delta/rapids/delta21x/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -219,8 +219,9 @@ class GpuOptimisticTransaction bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-22x/src/main/scala/org/apache/spark/sql/delta/rapids/delta22x/GpuOptimisticTransaction.scala b/delta-lake/delta-22x/src/main/scala/org/apache/spark/sql/delta/rapids/delta22x/GpuOptimisticTransaction.scala index 3ec00c5cb62..cb990ccc16d 100644 --- a/delta-lake/delta-22x/src/main/scala/org/apache/spark/sql/delta/rapids/delta22x/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-22x/src/main/scala/org/apache/spark/sql/delta/rapids/delta22x/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -241,8 +241,9 @@ class GpuOptimisticTransaction bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-23x/src/main/scala/org/apache/spark/sql/delta/rapids/delta23x/GpuOptimisticTransaction.scala b/delta-lake/delta-23x/src/main/scala/org/apache/spark/sql/delta/rapids/delta23x/GpuOptimisticTransaction.scala index 38ee8a786c0..905329a6b01 100644 --- a/delta-lake/delta-23x/src/main/scala/org/apache/spark/sql/delta/rapids/delta23x/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-23x/src/main/scala/org/apache/spark/sql/delta/rapids/delta23x/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -241,8 +241,9 @@ class GpuOptimisticTransaction bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-24x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuOptimisticTransaction.scala b/delta-lake/delta-24x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuOptimisticTransaction.scala index bc95a656271..fadc9bf99a8 100644 --- a/delta-lake/delta-24x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-24x/src/main/scala/org/apache/spark/sql/delta/rapids/delta24x/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -243,8 +243,9 @@ class GpuOptimisticTransaction bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index 9dff950ca5b..f4debdea1c1 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -261,8 +261,9 @@ class GpuOptimisticTransaction( bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index fb9d3840917..4790e643291 100644 --- a/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -258,8 +258,9 @@ class GpuOptimisticTransaction( bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-spark341db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark341db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index 3e836056b6d..e1e5f28dec6 100644 --- a/delta-lake/delta-spark341db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark341db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. @@ -259,8 +259,9 @@ class GpuOptimisticTransaction( bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/delta-lake/delta-spark350db143/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark350db143/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index e06aba55487..b49c5ebaa30 100644 --- a/delta-lake/delta-spark350db143/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark350db143/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -259,8 +259,9 @@ class GpuOptimisticTransaction( bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ identityTracker.toSeq ++ statsTrackers, options = options, - rapidsConf.stableSort, - rapidsConf.concurrentWriterPartitionFlushSize) + useStableSort = rapidsConf.stableSort, + concurrentWriterPartitionFlushSize = rapidsConf.concurrentWriterPartitionFlushSize, + baseDebugOutputPath = rapidsConf.outputDebugDumpPrefix) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index df62683d346..a3231177e9e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -16,11 +16,11 @@ package com.nvidia.spark.rapids -import java.io.OutputStream +import java.io.{BufferedOutputStream, DataOutputStream, OutputStream} import scala.collection.mutable -import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, TableWriter} +import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange, TableWriter} import com.nvidia.spark.Retryable import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -61,7 +61,8 @@ abstract class ColumnarOutputWriterFactory extends Serializable { def newInstance( path: String, dataSchema: StructType, - context: TaskAttemptContext): ColumnarOutputWriter + context: TaskAttemptContext, + debugOutputPath: Option[String]): ColumnarOutputWriter } /** @@ -73,9 +74,43 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, dataSchema: StructType, rangeName: String, includeRetry: Boolean, + debugDumpPath: Option[String], holdGpuBetweenBatches: Boolean = false) extends HostBufferConsumer with Logging { protected val tableWriter: TableWriter + private lazy val debugDumpOutputStream: Option[OutputStream] = try { + debugDumpPath.map { path => + val tc = TaskContext.get() + logWarning(s"DEBUG FILE OUTPUT $rangeName FOR " + + s"STAGE ${tc.stageId()} TASK ${tc.taskAttemptId()} is $path") + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(conf) + new DataOutputStream(new BufferedOutputStream(fs.create(hadoopPath, false))) + } + } catch { + case e: Exception => + logError(s"Could Not Write Debug Table $debugDumpPath", e) + None + } + + /** + * Write out a debug batch to the debug output stream if it is configured. + * If it is not configured, this is a noop. If an exception happens the exception + * is ignored, but it is logged. + */ + private def debugWriteBatch(batch: ColumnarBatch): Unit = { + debugDumpOutputStream.foreach { output => + try { + withResource(GpuColumnVector.from(batch)) { table => + JCudfSerialization.writeToStream(table, output, 0, table.getRowCount) + } + output.flush() + } catch { + case t: Throwable => + logError(s"Could Not Write Debug Table $debugDumpPath", t) + } + } + } protected val conf: Configuration = context.getConfiguration @@ -219,6 +254,7 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, // where corrupt files can be written if nothing is encoded via the writer. anythingWritten = true + debugWriteBatch(batch) // tableWriter.write() serializes the table into the HostMemoryBuffer, and buffers it // by calling handleBuffer() on the ColumnarOutputWriter. It may not write to the // output stream just yet. @@ -239,6 +275,9 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, GpuSemaphore.releaseIfNecessary(TaskContext.get()) writeBufferedData() outputStream.close() + debugDumpOutputStream.foreach { os => + os.close() + } } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 07b2d022f67..08592c8d244 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -368,7 +368,8 @@ final class InsertIntoHadoopFsRelationCommandMeta( cmd.fileIndex, cmd.outputColumnNames, conf.stableSort, - conf.concurrentWriterPartitionFlushSize) + conf.concurrentWriterPartitionFlushSize, + conf.outputDebugDumpPrefix) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 2b5f246e56a..cb1b3d4c610 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -280,10 +280,11 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { override def newInstance( path: String, dataSchema: StructType, - context: TaskAttemptContext): ColumnarOutputWriter = { + context: TaskAttemptContext, + debugOutputPath: Option[String]): ColumnarOutputWriter = { new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString, dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled, - holdGpuBetweenBatches) + holdGpuBetweenBatches, debugOutputPath) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -306,8 +307,10 @@ class GpuParquetWriter( timestampRebaseMode: DateTimeRebaseMode, context: TaskAttemptContext, parquetFieldIdEnabled: Boolean, - holdGpuBetweenBatches: Boolean) - extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, holdGpuBetweenBatches) { + holdGpuBetweenBatches: Boolean, + debugDumpPath: Option[String]) + extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, + debugDumpPath, holdGpuBetweenBatches) { override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = { val cols = GpuColumnVector.extractBases(batch) cols.foreach { col => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index c34e461b258..c4199e3ea75 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1682,6 +1682,15 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(false) + val OUTPUT_DEBUG_DUMP_PREFIX = conf("spark.rapids.sql.output.debug.dumpPrefix") + .doc("A path prefix where data that is intended to be written out as the result " + + "of a query should be dumped for debugging. The format of this is based on " + + "JCudfSerialization and is trying to capture the underlying table so that if " + + "there are errors in the output format we can try to recreate it.") + .internal() + .stringConf + .createOptional + val PARQUET_DEBUG_DUMP_PREFIX = conf("spark.rapids.sql.parquet.debug.dumpPrefix") .doc("A path prefix where Parquet split file data is dumped for debugging.") .internal() @@ -2883,6 +2892,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val maxGpuColumnSizeBytes: Long = get(MAX_GPU_COLUMN_SIZE_BYTES) + lazy val outputDebugDumpPrefix: Option[String] = get(OUTPUT_DEBUG_DUMP_PREFIX) + lazy val parquetDebugDumpPrefix: Option[String] = get(PARQUET_DEBUG_DUMP_PREFIX) lazy val parquetDebugDumpAlways: Boolean = get(PARQUET_DEBUG_DUMP_ALWAYS) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index 3b5244e5c79..78742ec461e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -181,16 +181,18 @@ class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFo override def newInstance(path: String, dataSchema: StructType, - context: TaskAttemptContext): ColumnarOutputWriter = { - new GpuHiveParquetWriter(path, dataSchema, context, compressionType) + context: TaskAttemptContext, + debugOutputPath: Option[String]): ColumnarOutputWriter = { + new GpuHiveParquetWriter(path, dataSchema, context, compressionType, debugOutputPath) } } } } class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, - context: TaskAttemptContext, compType: CompressionType) - extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true) { + context: TaskAttemptContext, compType: CompressionType, + debugOutputPath: Option[String]) + extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true, debugOutputPath) { override protected val tableWriter: CudfTableWriter = { val optionsBuilder = SchemaUtils @@ -217,8 +219,9 @@ class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging { override def newInstance(path: String, dataSchema: StructType, - context: TaskAttemptContext): ColumnarOutputWriter = { - new GpuHiveTextWriter(path, dataSchema, context) + context: TaskAttemptContext, + debugOutputPath: Option[String]): ColumnarOutputWriter = { + new GpuHiveTextWriter(path, dataSchema, context, debugOutputPath) } } } @@ -226,8 +229,9 @@ class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging { class GpuHiveTextWriter(override val path: String, dataSchema: StructType, - context: TaskAttemptContext) - extends ColumnarOutputWriter(context, dataSchema, "HiveText", false) { + context: TaskAttemptContext, + debugOutputPath: Option[String]) + extends ColumnarOutputWriter(context, dataSchema, "HiveText", false, debugOutputPath) { /** * This reformats columns, to iron out inconsistencies between diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuSaveAsHiveFile.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuSaveAsHiveFile.scala index 08118cc11a0..75424714673 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuSaveAsHiveFile.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuSaveAsHiveFile.scala @@ -43,6 +43,7 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH fileFormat: ColumnarFileFormat, outputLocation: String, forceHiveHashForBucketing: Boolean, + baseDebugOutputPath: Option[String], customPartitionLocations: Map[TablePartitionSpec,String] = Map.empty, partitionAttributes: Seq[Attribute] = Nil, bucketSpec: Option[BucketSpec] = None, @@ -67,7 +68,8 @@ private[hive] trait GpuSaveAsHiveFile extends GpuDataWritingCommand with SaveAsH options = options, useStableSort = false, // TODO: Fetch from RapidsConf. forceHiveHashForBucketing = forceHiveHashForBucketing, - concurrentWriterPartitionFlushSize = 0L // TODO: Fetch from RapidsConf. + concurrentWriterPartitionFlushSize = 0L, // TODO: Fetch from RapidsConf. + baseDebugOutputPath = baseDebugOutputPath ) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 6b6580df68f..67b9f00f816 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -228,7 +228,8 @@ class GpuEmptyDirectoryDataWriter( class GpuSingleDirectoryDataWriter( description: GpuWriteJobDescription, taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol) + committer: FileCommitProtocol, + debugOutputBasePath: Option[String]) extends GpuFileFormatDataWriter(description, taskAttemptContext, committer) { // Initialize currentWriter and statsTrackers newOutputWriter() @@ -247,10 +248,16 @@ class GpuSingleDirectoryDataWriter( None, f"-c$fileCounter%03d" + ext) + val debugOutputPath = debugOutputBasePath.map { base => + base + "/DEBUG_" + taskAttemptContext.getTaskAttemptID.toString + + f"-c$fileCounter%03d" + ".debug" + } + currentWriterStatus.writer = description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, - context = taskAttemptContext) + context = taskAttemptContext, + debugOutputPath = debugOutputPath) statsTrackers.foreach(_.newFile(currentPath)) } @@ -293,7 +300,8 @@ class GpuSingleDirectoryDataWriter( class GpuDynamicPartitionDataSingleWriter( description: GpuWriteJobDescription, taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol) + committer: FileCommitProtocol, + debugOutputBasePath: Option[String]) extends GpuFileFormatDataWriter(description, taskAttemptContext, committer) { /** Wrapper class to index a unique concurrent output writer. */ protected class WriterIndex( @@ -576,10 +584,22 @@ class GpuDynamicPartitionDataSingleWriter( committer.newTaskTempFile(taskAttemptContext, partDir, ext) } + val debugOutputPath = debugOutputBasePath.map { base => + if (customPath.isDefined) { + val hash = customPath.get.hashCode + base + s"/DEBUG_CUSTOM_${hash}_" + taskAttemptContext.getTaskAttemptID.toString + + f"-c$fileCounter%03d" + ".debug" + } else { + base + "/" + partDir.mkString("/") + s"/DEBUG_" + + taskAttemptContext.getTaskAttemptID.toString + f"-c$fileCounter%03d" + ".debug" + } + } + val outWriter = description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, - context = taskAttemptContext) + context = taskAttemptContext, + debugOutputPath = debugOutputPath) statsTrackers.foreach(_.newFile(currentPath)) outWriter @@ -665,8 +685,10 @@ class GpuDynamicPartitionDataConcurrentWriter( description: GpuWriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, - spec: GpuConcurrentOutputWriterSpec) - extends GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + spec: GpuConcurrentOutputWriterSpec, + debugOutputBasePath: Option[String]) + extends GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, + committer, debugOutputBasePath) with Logging { /** Wrapper class for status and caches of a unique concurrent output writer. */ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala index 2671323ea8c..c8a9d0c7ed1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala @@ -50,7 +50,8 @@ case class GpuInsertIntoHadoopFsRelationCommand( fileIndex: Option[FileIndex], outputColumnNames: Seq[String], useStableSort: Boolean, - concurrentWriterPartitionFlushSize: Long) + concurrentWriterPartitionFlushSize: Long, + baseDebugOutputPath: Option[String]) extends GpuDataWritingCommand { override def runColumnar(sparkSession: SparkSession, child: SparkPlan): Seq[ColumnarBatch] = { @@ -184,7 +185,8 @@ case class GpuInsertIntoHadoopFsRelationCommand( useStableSort = useStableSort, concurrentWriterPartitionFlushSize = concurrentWriterPartitionFlushSize, forceHiveHashForBucketing = forceHiveHashForBucketing, - numStaticPartitionCols = staticPartitions.size) + numStaticPartitionCols = staticPartitions.size, + baseDebugOutputPath = baseDebugOutputPath) // update metastore partition metadata diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index 1d4bc66a1da..b247647a03d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -183,8 +183,9 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging { new ColumnarOutputWriterFactory { override def newInstance(path: String, dataSchema: StructType, - context: TaskAttemptContext): ColumnarOutputWriter = { - new GpuOrcWriter(path, dataSchema, context) + context: TaskAttemptContext, + debugOutputPath: Option[String]): ColumnarOutputWriter = { + new GpuOrcWriter(path, dataSchema, context, debugOutputPath) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -205,8 +206,9 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging { class GpuOrcWriter(override val path: String, dataSchema: StructType, - context: TaskAttemptContext) - extends ColumnarOutputWriter(context, dataSchema, "ORC", true) { + context: TaskAttemptContext, + debugOutputPath: Option[String]) + extends ColumnarOutputWriter(context, dataSchema, "ORC", true, debugOutputPath) { override val tableWriter: TableWriter = { val builder = SchemaUtils diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala index c5d27c1e570..fde588dee62 100644 --- a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala +++ b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/CreateDataSourceTableAsSelectCommandMetaShims.scala @@ -87,6 +87,7 @@ final class CreateDataSourceTableAsSelectCommandMeta( origProvider, newProvider, conf.stableSort, - conf.concurrentWriterPartitionFlushSize) + conf.concurrentWriterPartitionFlushSize, + conf.outputDebugDumpPrefix) } } diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala index 9dd038b8874..d7a29bbc500 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala @@ -82,7 +82,8 @@ final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, query = wrapped.query, overwrite = wrapped.overwrite, ifPartitionNotExists = wrapped.ifPartitionNotExists, - outputColumnNames = wrapped.outputColumnNames + outputColumnNames = wrapped.outputColumnNames, + baseOutputDebugPath = conf.outputDebugDumpPrefix ) } @@ -96,7 +97,8 @@ case class GpuInsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - outputColumnNames: Seq[String]) extends GpuSaveAsHiveFile { + outputColumnNames: Seq[String], + baseOutputDebugPath: Option[String]) extends GpuSaveAsHiveFile { /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the @@ -219,7 +221,8 @@ case class GpuInsertIntoHiveTable( forceHiveHashForBucketing = forceHiveHashForBucketing, partitionAttributes = partitionAttributes, bucketSpec = BucketSpecForHiveShim.getBucketSpec(table, forceHiveHashForBucketing), - options = BucketingUtilsShim.getOptionsWithHiveBucketWrite(table.bucketSpec)) + options = BucketingUtilsShim.getOptionsWithHiveBucketWrite(table.bucketSpec), + baseDebugOutputPath = baseOutputDebugPath) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { @@ -363,4 +366,4 @@ case class GpuInsertIntoHiveTable( } override def requireSingleBatch: Boolean = false // TODO: Re-evaluate. If partitioned or bucketed? -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuDataSource.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuDataSource.scala index 0bbdc614967..8a6be9fc241 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuDataSource.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuDataSource.scala @@ -67,7 +67,8 @@ case class GpuDataSource( format: ColumnarFileFormat, mode: SaveMode, data: LogicalPlan, useStableSort: Boolean, - concurrentWriterPartitionFlushSize: Long): GpuInsertIntoHadoopFsRelationCommand = { + concurrentWriterPartitionFlushSize: Long, + baseDebugOutputPath: Option[String]): GpuInsertIntoHadoopFsRelationCommand = { // Don't glob path for the write path. The contracts here are: // 1. Only one output path can be specified on the write path; // 2. Output path must be a legal HDFS style file system path; @@ -107,7 +108,8 @@ case class GpuDataSource( fileIndex = fileIndex, outputColumnNames = data.output.map(_.name), useStableSort = useStableSort, - concurrentWriterPartitionFlushSize = concurrentWriterPartitionFlushSize) + concurrentWriterPartitionFlushSize = concurrentWriterPartitionFlushSize, + baseDebugOutputPath = baseDebugOutputPath) } /** @@ -131,7 +133,8 @@ case class GpuDataSource( outputColumnNames: Seq[String], physicalPlan: SparkPlan, useStableSort: Boolean, - concurrentWriterPartitionFlushSize: Long): BaseRelation = { + concurrentWriterPartitionFlushSize: Long, + baseDebugOutputPath: Option[String]): BaseRelation = { val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") @@ -139,7 +142,7 @@ case class GpuDataSource( // Only currently support ColumnarFileFormat val cmd = planForWritingFileFormat(gpuFileFormat, mode, data, useStableSort, - concurrentWriterPartitionFlushSize) + concurrentWriterPartitionFlushSize, baseDebugOutputPath) val resolvedPartCols = cmd.partitionColumns.map { col => // The partition columns created in `planForWritingFileFormat` should always be // `UnresolvedAttribute` with a single name part. diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index 71d2892e50e..6188919d2d4 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -104,7 +104,8 @@ object GpuFileFormatWriter extends Logging { useStableSort: Boolean, concurrentWriterPartitionFlushSize: Long, forceHiveHashForBucketing: Boolean = false, - numStaticPartitionCols: Int = 0): Set[String] = { + numStaticPartitionCols: Int = 0, + baseDebugOutputPath: Option[String]): Set[String] = { require(partitionColumns.size >= numStaticPartitionCols) val job = Job.getInstance(hadoopConf) @@ -257,7 +258,8 @@ object GpuFileFormatWriter extends Logging { sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, committer, iterator = iter, - concurrentOutputWriterSpec = concurrentOutputWriterSpec) + concurrentOutputWriterSpec = concurrentOutputWriterSpec, + baseDebugOutputPath = baseDebugOutputPath) }, rddWithNonEmptyPartitions.partitions.indices, (index, res: WriteTaskResult) => { @@ -291,7 +293,8 @@ object GpuFileFormatWriter extends Logging { sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[ColumnarBatch], - concurrentOutputWriterSpec: Option[GpuConcurrentOutputWriterSpec]): WriteTaskResult = { + concurrentOutputWriterSpec: Option[GpuConcurrentOutputWriterSpec], + baseDebugOutputPath: Option[String]): WriteTaskResult = { val jobId = RapidsHadoopWriterUtils.createJobID(jobTrackerId, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -317,14 +320,16 @@ object GpuFileFormatWriter extends Logging { // In case of empty job, leave first partition to save meta for file format like parquet. new GpuEmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new GpuSingleDirectoryDataWriter(description, taskAttemptContext, committer) + new GpuSingleDirectoryDataWriter(description, taskAttemptContext, committer, + baseDebugOutputPath) } else { concurrentOutputWriterSpec match { case Some(spec) => new GpuDynamicPartitionDataConcurrentWriter(description, taskAttemptContext, - committer, spec) + committer, spec, baseDebugOutputPath) case _ => - new GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + new GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer, + baseDebugOutputPath) } } diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala index 1123f1013f9..9d9182f32c1 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala @@ -55,7 +55,8 @@ case class GpuCreateDataSourceTableAsSelectCommand( origProvider: Class[_], gpuFileFormat: ColumnarFileFormat, useStableSort: Boolean, - concurrentWriterPartitionFlushSize: Long) + concurrentWriterPartitionFlushSize: Long, + baseDebugOutputPath: Option[String]) extends GpuDataWritingCommand { override def runColumnar(sparkSession: SparkSession, child: SparkPlan): Seq[ColumnarBatch] = { @@ -138,7 +139,7 @@ case class GpuCreateDataSourceTableAsSelectCommand( gpuFileFormat = gpuFileFormat) try { dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan, useStableSort, - concurrentWriterPartitionFlushSize) + concurrentWriterPartitionFlushSize, baseDebugOutputPath) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala index e7aecf0675f..43c515998a8 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala @@ -80,7 +80,8 @@ final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, overwrite = wrapped.overwrite, ifPartitionNotExists = wrapped.ifPartitionNotExists, outputColumnNames = wrapped.outputColumnNames, - tmpLocation = cmd.hiveTmpPath.externalTempPath + tmpLocation = cmd.hiveTmpPath.externalTempPath, + baseOutputDebugPath = conf.outputDebugDumpPrefix ) } @@ -95,7 +96,8 @@ case class GpuInsertIntoHiveTable( overwrite: Boolean, ifPartitionNotExists: Boolean, outputColumnNames: Seq[String], - tmpLocation: Path) extends GpuSaveAsHiveFile { + tmpLocation: Path, + baseOutputDebugPath: Option[String]) extends GpuSaveAsHiveFile { /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the @@ -206,7 +208,6 @@ case class GpuInsertIntoHiveTable( val forceHiveHashForBucketing = RapidsConf.FORCE_HIVE_HASH_FOR_BUCKETED_WRITE.get(sparkSession.sessionState.conf) - val writtenParts = gpuSaveAsHiveFile( sparkSession = sparkSession, plan = child, @@ -216,7 +217,8 @@ case class GpuInsertIntoHiveTable( forceHiveHashForBucketing = forceHiveHashForBucketing, partitionAttributes = partitionAttributes, bucketSpec = table.bucketSpec, - options = BucketingUtilsShim.getOptionsWithHiveBucketWrite(table.bucketSpec)) + options = BucketingUtilsShim.getOptionsWithHiveBucketWrite(table.bucketSpec), + baseDebugOutputPath = baseOutputDebugPath) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala index 2b4036e042b..34216111a9d 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala @@ -75,7 +75,8 @@ class GpuWriteFilesMeta( writeFilesExec.partitionColumns, writeFilesExec.bucketSpec, writeFilesExec.options, - writeFilesExec.staticPartitions + writeFilesExec.staticPartitions, + conf.outputDebugDumpPrefix ) } } @@ -89,7 +90,8 @@ case class GpuWriteFilesExec( partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], - staticPartitions: TablePartitionSpec) extends ShimUnaryExecNode with GpuExec { + staticPartitions: TablePartitionSpec, + baseOutputDebugPath: Option[String]) extends ShimUnaryExecNode with GpuExec { override def output: Seq[Attribute] = Seq.empty @@ -145,7 +147,8 @@ case class GpuWriteFilesExec( sparkAttemptNumber, committer, iterator, - concurrentOutputWriterSpec + concurrentOutputWriterSpec, + baseOutputDebugPath ) Iterator(ret) diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index fc4e9273281..6da9ce792ab 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -104,7 +104,8 @@ object GpuFileFormatWriter extends Logging { useStableSort: Boolean, concurrentWriterPartitionFlushSize: Long, forceHiveHashForBucketing: Boolean = false, - numStaticPartitionCols: Int = 0): Set[String] = { + numStaticPartitionCols: Int = 0, + baseDebugOutputPath: Option[String]): Set[String] = { require(partitionColumns.size >= numStaticPartitionCols) val job = Job.getInstance(hadoopConf) @@ -208,7 +209,8 @@ object GpuFileFormatWriter extends Logging { // In this path, Spark version is less than 340 or 'spark.sql.optimizer.plannedWrite.enabled' // is disabled, should sort the data if necessary. executeWrite(sparkSession, plan, job, description, committer, outputSpec, - requiredOrdering, partitionColumns, sortColumns, orderingMatched, useStableSort) + requiredOrdering, partitionColumns, sortColumns, orderingMatched, useStableSort, + baseDebugOutputPath) } } @@ -223,7 +225,8 @@ object GpuFileFormatWriter extends Logging { partitionColumns: Seq[Attribute], sortColumns: Seq[Attribute], orderingMatched: Boolean, - useStableSort: Boolean): Set[String] = { + useStableSort: Boolean, + baseDebugOutputPath: Option[String]): Set[String] = { val partitionSet = AttributeSet(partitionColumns) val hasGpuEmpty2Null = plan.find(p => GpuV1WriteUtils.hasGpuEmptyToNull(p.expressions)) .isDefined @@ -282,7 +285,8 @@ object GpuFileFormatWriter extends Logging { sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, committer, iterator = iter, - concurrentOutputWriterSpec = concurrentOutputWriterSpec) + concurrentOutputWriterSpec = concurrentOutputWriterSpec, + baseDebugOutputPath = baseDebugOutputPath) }, rddWithNonEmptyPartitions.partitions.indices, (index, res: WriteTaskResult) => { @@ -389,7 +393,8 @@ object GpuFileFormatWriter extends Logging { sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[ColumnarBatch], - concurrentOutputWriterSpec: Option[GpuConcurrentOutputWriterSpec]): WriteTaskResult = { + concurrentOutputWriterSpec: Option[GpuConcurrentOutputWriterSpec], + baseDebugOutputPath: Option[String]): WriteTaskResult = { val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -415,14 +420,16 @@ object GpuFileFormatWriter extends Logging { // In case of empty job, leave first partition to save meta for file format like parquet. new GpuEmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new GpuSingleDirectoryDataWriter(description, taskAttemptContext, committer) + new GpuSingleDirectoryDataWriter(description, taskAttemptContext, committer, + baseDebugOutputPath) } else { concurrentOutputWriterSpec match { case Some(spec) => new GpuDynamicPartitionDataConcurrentWriter(description, taskAttemptContext, - committer, spec) + committer, spec, baseDebugOutputPath) case _ => - new GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + new GpuDynamicPartitionDataSingleWriter(description, taskAttemptContext, committer, + baseDebugOutputPath) } } diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala index d52c8b47ae7..a0f85f9f265 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala @@ -58,7 +58,8 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { context, dataSchema, rangeName, - includeRetry) { + includeRetry, + None) { // this writer (for tests) doesn't do anything and passes through the // batch passed to it when asked to transform, which is done to @@ -92,7 +93,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { types, "", includeRetry)) - when(mockOutputWriterFactory.newInstance(any(), any(), any())) + when(mockOutputWriterFactory.newInstance(any(), any(), any(), any())) .thenAnswer(_ => mockOutputWriter) } @@ -231,7 +232,8 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { spy(new GpuDynamicPartitionDataSingleWriter( mockJobDescription, mockTaskAttemptContext, - mockCommitter)) + mockCommitter, + None)) } def prepareDynamicPartitionConcurrentWriter(maxWriters: Int, batchSize: Long): @@ -249,7 +251,8 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { mockJobDescription, mockTaskAttemptContext, mockCommitter, - concurrentSpec)) + concurrentSpec, + None)) } test("empty directory data writer") { @@ -291,7 +294,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { withColumnarBatchesVerifyClosed(cbs) { withResource(cbs) { _ => val singleWriter = spy(new GpuSingleDirectoryDataWriter( - mockJobDescription, mockTaskAttemptContext, mockCommitter)) + mockJobDescription, mockTaskAttemptContext, mockCommitter, None)) singleWriter.writeWithIterator(Iterator.empty) singleWriter.commit() } @@ -306,7 +309,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { val cbs = Seq(spy(cb), spy(cb2)) withColumnarBatchesVerifyClosed(cbs) { val singleWriter = spy(new GpuSingleDirectoryDataWriter( - mockJobDescription, mockTaskAttemptContext, mockCommitter)) + mockJobDescription, mockTaskAttemptContext, mockCommitter, None)) singleWriter.writeWithIterator(cbs.iterator) singleWriter.commit() // we write 2 batches @@ -326,7 +329,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { // setting this to 5 makes the single writer have to split at the 5 row boundary when(mockJobDescription.maxRecordsPerFile).thenReturn(5) val singleWriter = spy(new GpuSingleDirectoryDataWriter( - mockJobDescription, mockTaskAttemptContext, mockCommitter)) + mockJobDescription, mockTaskAttemptContext, mockCommitter, None)) singleWriter.writeWithIterator(cbs.iterator) singleWriter.commit() From ee7d35d8bd7e245d4405708cda3f108fce27064f Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 11 Dec 2024 15:35:08 -0600 Subject: [PATCH 2/4] Missed one copyright date --- .../sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index 4790e643291..48cebee6e41 100644 --- a/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark332db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala * in the Delta Lake project at https://github.com/delta-io/delta. From fb87ad91c6803cd91f8c2bca0e9e7ff9c698a071 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 11 Dec 2024 16:21:39 -0600 Subject: [PATCH 3/4] Review comments --- .../spark/sql/rapids/GpuFileFormatDataWriter.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 67b9f00f816..aca7d0e8751 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -249,8 +249,8 @@ class GpuSingleDirectoryDataWriter( f"-c$fileCounter%03d" + ext) val debugOutputPath = debugOutputBasePath.map { base => - base + "/DEBUG_" + taskAttemptContext.getTaskAttemptID.toString + - f"-c$fileCounter%03d" + ".debug" + base + s"/DEBUG_${taskAttemptContext.getTaskAttemptID}" + + f"_c$fileCounter%03d_${System.nanoTime()}.debug" } currentWriterStatus.writer = description.outputWriterFactory.newInstance( @@ -587,11 +587,11 @@ class GpuDynamicPartitionDataSingleWriter( val debugOutputPath = debugOutputBasePath.map { base => if (customPath.isDefined) { val hash = customPath.get.hashCode - base + s"/DEBUG_CUSTOM_${hash}_" + taskAttemptContext.getTaskAttemptID.toString + - f"-c$fileCounter%03d" + ".debug" + base + s"/DEBUG_CUSTOM_${hash}_${taskAttemptContext.getTaskAttemptID}" + + f"_c$fileCounter%03d_${System.nanoTime()}.debug" } else { - base + "/" + partDir.mkString("/") + s"/DEBUG_" + - taskAttemptContext.getTaskAttemptID.toString + f"-c$fileCounter%03d" + ".debug" + base + s"/${partDir.mkString("/")}/DEBUG_${taskAttemptContext.getTaskAttemptID}" + + f"_c$fileCounter%03d_${System.nanoTime()}.debug" } } From 8f8ce173ad564a3d8193e26b4ad100624e955cf2 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 12 Dec 2024 08:52:09 -0600 Subject: [PATCH 4/4] Remove this from a lambda Signed-off-by: Robert (Bobby) Evans --- .../spark/sql/execution/datasources/GpuWriteFiles.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala index 34216111a9d..abcf6178248 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala @@ -135,6 +135,8 @@ case class GpuWriteFilesExec( val description = writeFilesSpec.description val committer = writeFilesSpec.committer val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date()) + val localBaseOutputDebugPath = baseOutputDebugPath + rddWithNonEmptyPartitions.mapPartitionsInternal { iterator => val sparkStageId = TaskContext.get().stageId() val sparkPartitionId = TaskContext.get().partitionId() @@ -148,7 +150,7 @@ case class GpuWriteFilesExec( committer, iterator, concurrentOutputWriterSpec, - baseOutputDebugPath + localBaseOutputDebugPath ) Iterator(ret)