Skip to content

Commit

Permalink
Have "dump always" dump input files before trying to decode them (#11569
Browse files Browse the repository at this point in the history
)

Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Oct 9, 2024
1 parent b9b7629 commit b715ef2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 43 deletions.
34 changes: 18 additions & 16 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2822,6 +2822,12 @@ object MakeOrcTableProducer extends Logging {
debugDumpPrefix: Option[String],
debugDumpAlways: Boolean
): GpuDataProducer[Table] = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
if (useChunkedReader) {
OrcTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes,
parseOpts, buffer, offset, bufferSize, metrics, isSchemaCaseSensitive, readDataSchema,
Expand All @@ -2838,19 +2844,17 @@ object MakeOrcTableProducer extends Logging {
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing ${splits.mkString("; ")}$dumpMsg", e)
}
}
closeOnExcept(table) { _ =>
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
if (readDataSchema.length < table.getNumberOfColumns) {
throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " +
s"but read ${table.getNumberOfColumns} from ${splits.mkString("; ")}")
Expand Down Expand Up @@ -2895,8 +2899,12 @@ case class OrcTableReader(
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing $splitsString$dumpMsg", e)
}
Expand All @@ -2914,12 +2922,6 @@ case class OrcTableReader(
}

override def close(): Unit = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, bufferSize, prefix, ".orc")
logWarning(s"Wrote data for $splitsString to $p")
}
}
reader.close()
buffer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,12 @@ object MakeParquetTableProducer extends Logging {
debugDumpPrefix: Option[String],
debugDumpAlways: Boolean
): GpuDataProducer[Table] = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
if (useChunkedReader) {
ParquetTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes,
opts, buffer, offset,
Expand All @@ -2631,19 +2637,17 @@ object MakeParquetTableProducer extends Logging {
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing ${splits.mkString("; ")}$dumpMsg", e)
}
}
closeOnExcept(table) { _ =>
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for ${splits.mkString(", ")} to $p")
}
}
GpuParquetScan.throwIfRebaseNeededInExceptionMode(table, dateRebaseMode,
timestampRebaseMode)
if (readDataSchema.length < table.getNumberOfColumns) {
Expand Down Expand Up @@ -2695,8 +2699,12 @@ case class ParquetTableReader(
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(s"Error when processing $splitsString$dumpMsg", e)
}
Expand All @@ -2716,12 +2724,6 @@ case class ParquetTableReader(
}

override def close(): Unit = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for $splitsString to $p")
}
}
reader.close()
buffer.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -325,6 +325,12 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase =>
hostBuf: HostMemoryBuffer,
bufSize: Long,
splits: Array[PartitionedFile]): Table = {
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
logWarning(s"Wrote data for ${splits.mkString("; ")} to $p")
}
}
val readOpts = CudfAvroOptions.builder()
.includeColumn(readDataSchema.fieldNames.toSeq: _*)
.build()
Expand All @@ -341,20 +347,16 @@ trait GpuAvroReaderBase extends Logging { self: FilePartitionReaderBase =>
} catch {
case e: Exception =>
val dumpMsg = debugDumpPrefix.map { prefix =>
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
s", data dumped to $p"
if (!debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
s", data dumped to $p"
} else {
""
}
}.getOrElse("")
throw new IOException(
s"Error when processing file splits [${splits.mkString("; ")}]$dumpMsg", e)
}
closeOnExcept(table) { _ =>
debugDumpPrefix.foreach { prefix =>
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, hostBuf, 0, bufSize, prefix, ".avro")
logWarning(s"Wrote data for ${splits.mkString("; ")} to $p")
}
}
}
table
}

Expand Down

0 comments on commit b715ef2

Please sign in to comment.