Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lore path serde #33

Closed
10 changes: 10 additions & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ Name | Description | Default Value | Applicable at
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|When reading a quoted string as a decimal Spark supports reading non-ascii unicode digits, and the RAPIDS Accelerator does not.|true|Runtime
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime
<a name="sql.lore.dumpPath"></a>spark.rapids.sql.lore.dumpPath|The path to dump the lore nodes' input data. This must be set if spark.rapids.sql.lore.idsToDump has
been set.|None|Runtime
<a name="sql.lore.idsToDump"></a>spark.rapids.sql.lore.idsToDump|Specify the lore ids of operators to dump. The format is a comma separated list of
lore ids. For example: "1,2,3" will dump the gpu exec nodes with lore ids 1, 2, and 3.
By default, all partitions of operators' input will be dumped. If you want to dump only
some partitions, you can specify the partition index after the lore id, e.g. 1[0-2 4-5
7], 2[0 4 5-8] , will dump partitions 0, 1, 2, 4, 5 and 7 of the operator with lore id
1, and partitions 0, 4, 5, 6, 7, 8 of the operator with lore id 2.
If this is not set, no lore nodes will be dumped.|None|Runtime
<a name="sql.lore.tag.enabled"></a>spark.rapids.sql.lore.tag.enabled|Enable add a lore id to each gpu plan node|true|Runtime
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup
<a name="sql.optimizer.joinReorder.enabled"></a>spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime
Expand Down
28 changes: 24 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.nvidia.spark.rapids

import java.io.{File, FileOutputStream}
import java.io.{File, FileOutputStream, OutputStream}
import java.util.Random

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -82,6 +82,23 @@ object DumpUtils extends Logging {
}
}

/**
* Dump columnar batch to output stream in parquet format. <br>
*
* @param columnarBatch The columnar batch to be dumped, should be GPU columnar batch. It
* should be closed by caller.
* @param outputStream Will be closed after writing.
*/
def dumpToParquet(columnarBatch: ColumnarBatch, outputStream: OutputStream): Unit = {
closeOnExcept(outputStream) { _ =>
withResource(GpuColumnVector.from(columnarBatch)) { table =>
withResource(new ParquetDumper(outputStream, table)) { dumper =>
dumper.writeTable(table)
}
}
}
}

/**
* Debug utility to dump table to parquet file. <br>
* It's running on GPU. Parquet column names are generated from table column type info. <br>
Expand Down Expand Up @@ -129,12 +146,15 @@ object DumpUtils extends Logging {
}

// parquet dumper
class ParquetDumper(path: String, table: Table) extends HostBufferConsumer
class ParquetDumper(private val outputStream: OutputStream, table: Table) extends HostBufferConsumer
with AutoCloseable {
private[this] val outputStream = new FileOutputStream(path)
private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]()

def this(path: String, table: Table) = {
this(new FileOutputStream(path), table)
}

val tableWriter: TableWriter = {
// avoid anything conversion, just dump as it is
val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1758,6 +1758,7 @@ case class GpuHashAggregateExec(
|${ExplainUtils.generateFieldString("Functions", aggregateExpressions)}
|${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)}
|${ExplainUtils.generateFieldString("Results", resultExpressions)}
|Lore: ${loreArgs.mkString(", ")}
|""".stripMargin
}

Expand Down Expand Up @@ -1886,9 +1887,9 @@ case class GpuHashAggregateExec(
truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
if (verbose) {
s"GpuHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
s"$nodeName (keys=$keyString, functions=$functionString, output=$outputString)"
} else {
s"GpuHashAggregate(keys=$keyString, functions=$functionString)," +
s"$nodeName (keys=$keyString, functions=$functionString)," +
s" filters=${aggregateExpressions.map(_.filter)})"
}
}
Expand Down
30 changes: 29 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.filecache.FileCacheConf
import com.nvidia.spark.rapids.lore.{GpuLore, GpuLoreDumpRDD}
import com.nvidia.spark.rapids.lore.GpuLore.{loreIdOf, LORE_DUMP_PATH_TAG, LORE_DUMP_RDD_TAG}
import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.rapids.LocationPreservingMapPartitionsRDD
Expand Down Expand Up @@ -363,7 +366,8 @@ trait GpuExec extends SparkPlan {
this.getTagValue(GpuExec.TASK_METRICS_TAG)

final override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val orig = internalDoExecuteColumnar()
this.dumpLoreMetaInfo()
val orig = this.dumpLoreRDD(internalDoExecuteColumnar())
val metrics = getTaskMetrics
metrics.map { gpuMetrics =>
// This is ugly, but it reduces the need to change all exec nodes, so we are doing it here
Expand All @@ -374,5 +378,29 @@ trait GpuExec extends SparkPlan {
}.getOrElse(orig)
}

override def stringArgs: Iterator[Any] = super.stringArgs ++ loreArgs

protected def loreArgs: Iterator[String] = {
val loreIdStr = loreIdOf(this).map(id => s"[loreId=$id]")
val lorePathStr = getTagValue(LORE_DUMP_PATH_TAG).map(path => s"[lorePath=$path]")
val loreRDDInfoStr = getTagValue(LORE_DUMP_RDD_TAG).map(info => s"[loreRDDInfo=$info]")

List(loreIdStr, lorePathStr, loreRDDInfoStr).flatten.iterator
}

private def dumpLoreMetaInfo(): Unit = {
getTagValue(LORE_DUMP_PATH_TAG).foreach { rootPath =>
GpuLore.dumpPlan(this, new Path(rootPath))
}
}

protected def dumpLoreRDD(inner: RDD[ColumnarBatch]): RDD[ColumnarBatch] = {
getTagValue(LORE_DUMP_RDD_TAG).map { info =>
val rdd = new GpuLoreDumpRDD(info, inner)
rdd.saveMeta()
rdd
}.getOrElse(inner)
}

protected def internalDoExecuteColumnar(): RDD[ColumnarBatch]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF}
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.lore.GpuLore
import com.nvidia.spark.rapids.shims._
import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuLag, GpuLead, GpuPercentRank, GpuRank, GpuRowNumber, GpuSpecialFrameBoundary, GpuWindowExecMeta, GpuWindowSpecDefinitionMeta}
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -4707,7 +4708,12 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
}
}
GpuOverrides.doConvertPlan(wrap, conf, optimizations)
val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations)
if (conf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
GpuLore.tagForLore(convertedPlan, conf)
} else {
convertedPlan
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.mutable

import com.nvidia.spark.rapids.lore.GpuLore
import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl}

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -823,6 +824,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
updatedPlan = fixupAdaptiveExchangeReuse(updatedPlan)
}

if (rapidsConf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
updatedPlan = GpuLore.tagForLore(updatedPlan, rapidsConf)
}

if (rapidsConf.logQueryTransformations) {
logWarning(s"Transformed query:" +
s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan")
Expand Down
23 changes: 23 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2300,6 +2300,29 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(false)

val TAG_LORE_ID_ENABLED = conf("spark.rapids.sql.lore.tag.enabled")
.doc("Enable add a lore id to each gpu plan node")
.booleanConf
.createWithDefault(true)

val LORE_DUMP_IDS = conf("spark.rapids.sql.lore.idsToDump")
.doc("""Specify the lore ids of operators to dump. The format is a comma separated list of
|lore ids. For example: "1,2,3" will dump the gpu exec nodes with lore ids 1, 2, and 3.
|By default, all partitions of operators' input will be dumped. If you want to dump only
|some partitions, you can specify the partition index after the lore id, e.g. 1[0-2 4-5
|7], 2[0 4 5-8] , will dump partitions 0, 1, 2, 4, 5 and 7 of the operator with lore id
| 1, and partitions 0, 4, 5, 6, 7, 8 of the operator with lore id 2.
|If this is not set, no lore nodes will be dumped.""".stripMargin)
.stringConf
.createOptional

val LORE_DUMP_PATH = conf("spark.rapids.sql.lore.dumpPath")
.doc(
s"""The path to dump the lore nodes' input data. This must be set if ${LORE_DUMP_IDS.key} has
|been set.""".stripMargin)
.stringConf
.createOptional

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down
Loading