Skip to content

Commit

Permalink
Introduce LORE framework. (NVIDIA#11084)
Browse files Browse the repository at this point in the history
* Introduce lore id

* Introduce lore id

* Fix type

* Fix type

* Conf

* style

* part

* Dump

* Introduce lore framework

* Add tests.

* Rename test case

Signed-off-by: liurenjie1024 <[email protected]>

* Fix AQE test

* Fix style

* Use args to display lore info.

* Fix build break

* Fix path in loreinfo

* Remove path

* Fix comments

* Update configs

* Fix comments

* Fix config

---------

Signed-off-by: liurenjie1024 <[email protected]>
  • Loading branch information
liurenjie1024 committed Jul 2, 2024
1 parent 3e1ae8c commit 7a26525
Show file tree
Hide file tree
Showing 17 changed files with 1,029 additions and 11 deletions.
2 changes: 2 additions & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ Name | Description | Default Value | Applicable at
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|When reading a quoted string as a decimal Spark supports reading non-ascii unicode digits, and the RAPIDS Accelerator does not.|true|Runtime
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime
<a name="sql.lore.dumpPath"></a>spark.rapids.sql.lore.dumpPath|The path to dump the LORE nodes' input data. This must be set if spark.rapids.sql.lore.idsToDump has been set. The data of each LORE node will be dumped to a subfolder with name 'loreId-<LORE id>' under this path. For more details, please refer to [the LORE documentation](../dev/lore.md).|None|Runtime
<a name="sql.lore.idsToDump"></a>spark.rapids.sql.lore.idsToDump|Specify the LORE ids of operators to dump. The format is a comma separated list of LORE ids. For example: "1[0]" will dump partition 0 of input of gpu operator with lore id 1. For more details, please refer to [the LORE documentation](../dev/lore.md). If this is not set, no data will be dumped.|None|Runtime
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup
<a name="sql.optimizer.joinReorder.enabled"></a>spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime
Expand Down
70 changes: 70 additions & 0 deletions docs/dev/lore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
layout: page
title: The Local Replay Framework
nav_order: 13
parent: Developer Overview
---

# Local Replay Framework

## Overview

LORE (the local replay framework) is a tool that allows developer to replay the execution of a
gpu operator in local environment, so that developer could debug and profile the operator for
performance analysis. In high level it works as follows:

1. Each gpu operator will be assigned a LORE id, which is a unique identifier for the operator.
This id is guaranteed to be unique within the same query, and guaranteed to be same when two
sql executions have same sql, same configuration, and same data.
2. In the first run of the query, developer could found the LORE id of the operator they are
interested in by checking spark ui, where LORE id usually appears in the arguments of operator.
3. In the second run of the query, developer needs to configure the LORE ids of the operators they
are interested in, and LORE will dump the input data of the operator to given path.
4. Developer could copy the dumped data to local environment, and replay the operator in local
environment.

## Configuration

By default, LORE id will always be generated for operators, but user could disable this behavior
by setting `spark.rapids.sql.lore.tag.enabled` to `false`.

To tell LORE the LORE ids of the operators you are interested in, you need to set
`spark.rapids.sql.lore.idsToDump`. For example, you could set it to "1[*], 2[*], 3[*]" to tell
LORE to dump all partitions of input data of operators with id 1, 2, or 3. You can also only dump
some partition of the operator's input by appending partition numbers to lore ids. For example,
"1[0 4-6 7], 2[*]" tell LORE to dump operator with LORE id 1, but only dump partition 0, 4, 5, 6,
and 7. But for operator with LORE id 2, it will dump all partitions.

You also need to set `spark.rapids.sql.lore.dumpPath` to tell LORE where to dump the data, the
value of which should point to a directory. All dumped data of a query will live in this
directory. A typical directory hierarchy would look like this:

```console
+ loreId-10/
- plan.meta
+ input-0/
- rdd.meta
+ partition-0/
- partition.meta
- batch-0.parquet
- batch-1.parquet
+ partition-1/
- partition.meta
- batch-0.parquet
+ input-1/
- rdd.meta
+ partition-0/
- partition.meta
- batch-0.parquet
- batch-1.parquet
+ loreId-15/
- plan.meta
+ input-0/
- rdd.meta
+ partition-0/
- partition.meta
- batch-0.parquet
```


28 changes: 24 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.nvidia.spark.rapids

import java.io.{File, FileOutputStream}
import java.io.{File, FileOutputStream, OutputStream}
import java.util.Random

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -82,6 +82,23 @@ object DumpUtils extends Logging {
}
}

/**
* Dump columnar batch to output stream in parquet format. <br>
*
* @param columnarBatch The columnar batch to be dumped, should be GPU columnar batch. It
* should be closed by caller.
* @param outputStream Will be closed after writing.
*/
def dumpToParquet(columnarBatch: ColumnarBatch, outputStream: OutputStream): Unit = {
closeOnExcept(outputStream) { _ =>
withResource(GpuColumnVector.from(columnarBatch)) { table =>
withResource(new ParquetDumper(outputStream, table)) { dumper =>
dumper.writeTable(table)
}
}
}
}

/**
* Debug utility to dump table to parquet file. <br>
* It's running on GPU. Parquet column names are generated from table column type info. <br>
Expand Down Expand Up @@ -129,12 +146,15 @@ object DumpUtils extends Logging {
}

// parquet dumper
class ParquetDumper(path: String, table: Table) extends HostBufferConsumer
class ParquetDumper(private val outputStream: OutputStream, table: Table) extends HostBufferConsumer
with AutoCloseable {
private[this] val outputStream = new FileOutputStream(path)
private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]()

def this(path: String, table: Table) = {
this(new FileOutputStream(path), table)
}

val tableWriter: TableWriter = {
// avoid anything conversion, just dump as it is
val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,7 @@ case class GpuHashAggregateExec(
|${ExplainUtils.generateFieldString("Functions", aggregateExpressions)}
|${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)}
|${ExplainUtils.generateFieldString("Results", resultExpressions)}
|Lore: ${loreArgs.mkString(", ")}
|""".stripMargin
}

Expand Down Expand Up @@ -2130,10 +2131,12 @@ case class GpuHashAggregateExec(
truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields)
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
if (verbose) {
s"GpuHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)"
s"$nodeName (keys=$keyString, functions=$functionString, output=$outputString) " +
s"""${loreArgs.mkString(", ")}"""
} else {
s"GpuHashAggregate(keys=$keyString, functions=$functionString)," +
s" filters=${aggregateExpressions.map(_.filter)})"
s"$nodeName (keys=$keyString, functions=$functionString)," +
s" filters=${aggregateExpressions.map(_.filter)})" +
s""" ${loreArgs.mkString(", ")}"""
}
}
//
Expand Down
30 changes: 29 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.filecache.FileCacheConf
import com.nvidia.spark.rapids.lore.{GpuLore, GpuLoreDumpRDD}
import com.nvidia.spark.rapids.lore.GpuLore.{loreIdOf, LORE_DUMP_PATH_TAG, LORE_DUMP_RDD_TAG}
import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.rapids.LocationPreservingMapPartitionsRDD
Expand Down Expand Up @@ -387,7 +390,8 @@ trait GpuExec extends SparkPlan {
this.getTagValue(GpuExec.TASK_METRICS_TAG)

final override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val orig = internalDoExecuteColumnar()
this.dumpLoreMetaInfo()
val orig = this.dumpLoreRDD(internalDoExecuteColumnar())
val metrics = getTaskMetrics
metrics.map { gpuMetrics =>
// This is ugly, but it reduces the need to change all exec nodes, so we are doing it here
Expand All @@ -398,5 +402,29 @@ trait GpuExec extends SparkPlan {
}.getOrElse(orig)
}

override def stringArgs: Iterator[Any] = super.stringArgs ++ loreArgs

protected def loreArgs: Iterator[String] = {
val loreIdStr = loreIdOf(this).map(id => s"[loreId=$id]")
val lorePathStr = getTagValue(LORE_DUMP_PATH_TAG).map(path => s"[lorePath=$path]")
val loreRDDInfoStr = getTagValue(LORE_DUMP_RDD_TAG).map(info => s"[loreRDDInfo=$info]")

List(loreIdStr, lorePathStr, loreRDDInfoStr).flatten.iterator
}

private def dumpLoreMetaInfo(): Unit = {
getTagValue(LORE_DUMP_PATH_TAG).foreach { rootPath =>
GpuLore.dumpPlan(this, new Path(rootPath))
}
}

protected def dumpLoreRDD(inner: RDD[ColumnarBatch]): RDD[ColumnarBatch] = {
getTagValue(LORE_DUMP_RDD_TAG).map { info =>
val rdd = new GpuLoreDumpRDD(info, inner)
rdd.saveMeta()
rdd
}.getOrElse(inner)
}

protected def internalDoExecuteColumnar(): RDD[ColumnarBatch]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF}
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.lore.GpuLore
import com.nvidia.spark.rapids.shims._
import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuLag, GpuLead, GpuPercentRank, GpuRank, GpuRowNumber, GpuSpecialFrameBoundary, GpuWindowExecMeta, GpuWindowSpecDefinitionMeta}
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -4733,7 +4734,12 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
}
}
GpuOverrides.doConvertPlan(wrap, conf, optimizations)
val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations)
if (conf.isTagLoreIdEnabled) {
GpuLore.tagForLore(convertedPlan, conf)
} else {
convertedPlan
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.mutable

import com.nvidia.spark.rapids.lore.GpuLore
import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl}

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -823,6 +824,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
updatedPlan = fixupAdaptiveExchangeReuse(updatedPlan)
}

if (rapidsConf.isTagLoreIdEnabled) {
updatedPlan = GpuLore.tagForLore(updatedPlan, rapidsConf)
}

if (rapidsConf.logQueryTransformations) {
logWarning(s"Transformed query:" +
s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan")
Expand Down
31 changes: 31 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{HashMap, ListBuffer}

import ai.rapids.cudf.Cuda
import com.nvidia.spark.rapids.jni.RmmSpark.OomInjectionType
import com.nvidia.spark.rapids.lore.{LoreId, OutputLoreId}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -2315,6 +2316,28 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(false)

val TAG_LORE_ID_ENABLED = conf("spark.rapids.sql.lore.tag.enabled")
.doc("Enable add a LORE id to each gpu plan node")
.internal()
.booleanConf
.createWithDefault(true)

val LORE_DUMP_IDS = conf("spark.rapids.sql.lore.idsToDump")
.doc("Specify the LORE ids of operators to dump. The format is a comma separated list of " +
"LORE ids. For example: \"1[0]\" will dump partition 0 of input of gpu operator " +
"with lore id 1. For more details, please refer to " +
"[the LORE documentation](../dev/lore.md). If this is not set, no data will be dumped.")
.stringConf
.createOptional

val LORE_DUMP_PATH = conf("spark.rapids.sql.lore.dumpPath")
.doc(s"The path to dump the LORE nodes' input data. This must be set if ${LORE_DUMP_IDS.key} " +
"has been set. The data of each LORE node will be dumped to a subfolder with name " +
"'loreId-<LORE id>' under this path. For more details, please refer to " +
"[the LORE documentation](../dev/lore.md).")
.stringConf
.createOptional

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -3130,6 +3153,14 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isDeltaLowShuffleMergeEnabled: Boolean = get(ENABLE_DELTA_LOW_SHUFFLE_MERGE)

lazy val isTagLoreIdEnabled: Boolean = get(TAG_LORE_ID_ENABLED)

lazy val loreDumpIds: Map[LoreId, OutputLoreId] = get(LORE_DUMP_IDS)
.map(OutputLoreId.parse)
.getOrElse(Map.empty)

lazy val loreDumpPath: Option[String] = get(LORE_DUMP_PATH)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Loading

0 comments on commit 7a26525

Please sign in to comment.