Skip to content

Commit

Permalink
Introduce LORE framework. (NVIDIA#11084)
Browse files Browse the repository at this point in the history
* Introduce lore id

* Introduce lore id

* Fix type

* Fix type

* Conf

* style

* part

* Dump

* Introduce lore framework

* Add tests.

* Rename test case

Signed-off-by: liurenjie1024 <[email protected]>

* Fix AQE test

* Fix style

* Use args to display lore info.

* Fix build break

* Fix path in loreinfo

* Remove path

* Fix comments

* Update configs

* Fix comments

* Fix config

---------

Signed-off-by: liurenjie1024 <[email protected]>
  • Loading branch information
liurenjie1024 committed Jul 2, 2024
1 parent e0380b9 commit 7e503bd
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 175 deletions.
12 changes: 2 additions & 10 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,8 @@ Name | Description | Default Value | Applicable at
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|When reading a quoted string as a decimal Spark supports reading non-ascii unicode digits, and the RAPIDS Accelerator does not.|true|Runtime
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime
<a name="sql.lore.dumpPath"></a>spark.rapids.sql.lore.dumpPath|The path to dump the lore nodes' input data. This must be set if spark.rapids.sql.lore.idsToDump has
been set.|None|Runtime
<a name="sql.lore.idsToDump"></a>spark.rapids.sql.lore.idsToDump|Specify the lore ids of operators to dump. The format is a comma separated list of
lore ids. For example: "1,2,3" will dump the gpu exec nodes with lore ids 1, 2, and 3.
By default, all partitions of operators' input will be dumped. If you want to dump only
some partitions, you can specify the partition index after the lore id, e.g. 1[0-2 4-5
7], 2[0 4 5-8] , will dump partitions 0, 1, 2, 4, 5 and 7 of the operator with lore id
1, and partitions 0, 4, 5, 6, 7, 8 of the operator with lore id 2.
If this is not set, no lore nodes will be dumped.|None|Runtime
<a name="sql.lore.tag.enabled"></a>spark.rapids.sql.lore.tag.enabled|Enable add a lore id to each gpu plan node|true|Runtime
<a name="sql.lore.dumpPath"></a>spark.rapids.sql.lore.dumpPath|The path to dump the LORE nodes' input data. This must be set if spark.rapids.sql.lore.idsToDump has been set. The data of each LORE node will be dumped to a subfolder with name 'loreId-<LORE id>' under this path. For more details, please refer to [the LORE documentation](../dev/lore.md).|None|Runtime
<a name="sql.lore.idsToDump"></a>spark.rapids.sql.lore.idsToDump|Specify the LORE ids of operators to dump. The format is a comma separated list of LORE ids. For example: "1[0]" will dump partition 0 of input of gpu operator with lore id 1. For more details, please refer to [the LORE documentation](../dev/lore.md). If this is not set, no data will be dumped.|None|Runtime
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup
<a name="sql.optimizer.joinReorder.enabled"></a>spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime
Expand Down
70 changes: 70 additions & 0 deletions docs/dev/lore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
layout: page
title: The Local Replay Framework
nav_order: 13
parent: Developer Overview
---

# Local Replay Framework

## Overview

LORE (the local replay framework) is a tool that allows developer to replay the execution of a
gpu operator in local environment, so that developer could debug and profile the operator for
performance analysis. In high level it works as follows:

1. Each gpu operator will be assigned a LORE id, which is a unique identifier for the operator.
This id is guaranteed to be unique within the same query, and guaranteed to be same when two
sql executions have same sql, same configuration, and same data.
2. In the first run of the query, developer could found the LORE id of the operator they are
interested in by checking spark ui, where LORE id usually appears in the arguments of operator.
3. In the second run of the query, developer needs to configure the LORE ids of the operators they
are interested in, and LORE will dump the input data of the operator to given path.
4. Developer could copy the dumped data to local environment, and replay the operator in local
environment.

## Configuration

By default, LORE id will always be generated for operators, but user could disable this behavior
by setting `spark.rapids.sql.lore.tag.enabled` to `false`.

To tell LORE the LORE ids of the operators you are interested in, you need to set
`spark.rapids.sql.lore.idsToDump`. For example, you could set it to "1[*], 2[*], 3[*]" to tell
LORE to dump all partitions of input data of operators with id 1, 2, or 3. You can also only dump
some partition of the operator's input by appending partition numbers to lore ids. For example,
"1[0 4-6 7], 2[*]" tell LORE to dump operator with LORE id 1, but only dump partition 0, 4, 5, 6,
and 7. But for operator with LORE id 2, it will dump all partitions.

You also need to set `spark.rapids.sql.lore.dumpPath` to tell LORE where to dump the data, the
value of which should point to a directory. All dumped data of a query will live in this
directory. A typical directory hierarchy would look like this:

```console
+ loreId-10/
- plan.meta
+ input-0/
- rdd.meta
+ partition-0/
- partition.meta
- batch-0.parquet
- batch-1.parquet
+ partition-1/
- partition.meta
- batch-0.parquet
+ input-1/
- rdd.meta
+ partition-0/
- partition.meta
- batch-0.parquet
- batch-1.parquet
+ loreId-15/
- plan.meta
+ input-0/
- rdd.meta
+ partition-0/
- partition.meta
- batch-0.parquet
```


Original file line number Diff line number Diff line change
Expand Up @@ -4721,7 +4721,7 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
}
val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations)
if (conf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
if (conf.isTagLoreIdEnabled) {
GpuLore.tagForLore(convertedPlan, conf)
} else {
convertedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
updatedPlan = fixupAdaptiveExchangeReuse(updatedPlan)
}

if (rapidsConf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
if (rapidsConf.isTagLoreIdEnabled) {
updatedPlan = GpuLore.tagForLore(updatedPlan, rapidsConf)
}

Expand Down
34 changes: 21 additions & 13 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{HashMap, ListBuffer}

import ai.rapids.cudf.Cuda
import com.nvidia.spark.rapids.jni.RmmSpark.OomInjectionType
import com.nvidia.spark.rapids.lore.{LoreId, OutputLoreId}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -2332,32 +2333,31 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.createWithDefault(false)

val TAG_LORE_ID_ENABLED = conf("spark.rapids.sql.lore.tag.enabled")
.doc("Enable add a lore id to each gpu plan node")
.doc("Enable add a LORE id to each gpu plan node")
.internal()
.booleanConf
.createWithDefault(true)

val LORE_DUMP_IDS = conf("spark.rapids.sql.lore.idsToDump")
.doc("""Specify the lore ids of operators to dump. The format is a comma separated list of
|lore ids. For example: "1,2,3" will dump the gpu exec nodes with lore ids 1, 2, and 3.
|By default, all partitions of operators' input will be dumped. If you want to dump only
|some partitions, you can specify the partition index after the lore id, e.g. 1[0-2 4-5
|7], 2[0 4 5-8] , will dump partitions 0, 1, 2, 4, 5 and 7 of the operator with lore id
| 1, and partitions 0, 4, 5, 6, 7, 8 of the operator with lore id 2.
|If this is not set, no lore nodes will be dumped.""".stripMargin)
.doc("Specify the LORE ids of operators to dump. The format is a comma separated list of " +
"LORE ids. For example: \"1[0]\" will dump partition 0 of input of gpu operator " +
"with lore id 1. For more details, please refer to " +
"[the LORE documentation](../dev/lore.md). If this is not set, no data will be dumped.")
.stringConf
.createOptional

val LORE_DUMP_PATH = conf("spark.rapids.sql.lore.dumpPath")
.doc(
s"""The path to dump the lore nodes' input data. This must be set if ${LORE_DUMP_IDS.key} has
|been set.""".stripMargin)
.doc(s"The path to dump the LORE nodes' input data. This must be set if ${LORE_DUMP_IDS.key} " +
"has been set. The data of each LORE node will be dumped to a subfolder with name " +
"'loreId-<LORE id>' under this path. For more details, please refer to " +
"[the LORE documentation](../dev/lore.md).")
.stringConf
.createOptional

val CASE_WHEN_FUSE =
val CASE_WHEN_FUSE =
conf("spark.rapids.sql.case_when.fuse")
.doc("If when branches is greater than 2 and all then/else values in case when are string " +
"scalar, fuse mode improves the performance. By default this is enabled.")
"scalar, fuse mode improves the performance. By default this is enabled.")
.internal()
.booleanConf
.createWithDefault(true)
Expand Down Expand Up @@ -3183,6 +3183,14 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val caseWhenFuseEnabled: Boolean = get(CASE_WHEN_FUSE)

lazy val isTagLoreIdEnabled: Boolean = get(TAG_LORE_ID_ENABLED)

lazy val loreDumpIds: Map[LoreId, OutputLoreId] = get(LORE_DUMP_IDS)
.map(OutputLoreId.parse)
.getOrElse(Map.empty)

lazy val loreDumpPath: Option[String] = get(LORE_DUMP_PATH)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Loading

0 comments on commit 7e503bd

Please sign in to comment.