From 208b79b58eb0ceedad6d27105dbab13693191b61 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 6 Jun 2024 18:46:52 +0800 Subject: [PATCH 01/17] Introduce lore id --- .../com/nvidia/spark/rapids/GpuExec.scala | 9 +++++ .../nvidia/spark/rapids/GpuOverrides.scala | 8 +++- .../spark/rapids/GpuTransitionOverrides.scala | 7 +++- .../com/nvidia/spark/rapids/RapidsConf.scala | 5 +++ .../com/nvidia/spark/rapids/lore/IdGen.scala | 39 +++++++++++++++++++ 5 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index ec87dd62d6c..985260249d7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.filecache.FileCacheConf +import com.nvidia.spark.rapids.lore.IdGen.lordIdOf import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.spark.internal.Logging @@ -373,6 +374,14 @@ trait GpuExec extends SparkPlan { } }.getOrElse(orig) } + + override def nodeName: String = { + lordIdOf(this) match { + case Some(loreId) => s"${super.nodeName} [loreId=$loreId]" + case None => s"${super.nodeName} [loreId=unknown]" + } + } + } protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 295480d24cc..491271e33ea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal import ai.rapids.cudf.DType import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF} import com.nvidia.spark.rapids.jni.GpuTimeZoneDB +import com.nvidia.spark.rapids.lore.IdGen import com.nvidia.spark.rapids.shims._ import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuLag, GpuLead, GpuPercentRank, GpuRank, GpuRowNumber, GpuSpecialFrameBoundary, GpuWindowExecMeta, GpuWindowSpecDefinitionMeta} import org.apache.hadoop.fs.Path @@ -4707,7 +4708,12 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { } } } - GpuOverrides.doConvertPlan(wrap, conf, optimizations) + val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations) + if (conf.get(RapidsConf.TAG_LORE_ID_ENABLED)) { + IdGen.tagLoreId(convertedPlan) + } else { + convertedPlan + } } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 48f9de5a61a..49d76b7d822 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -21,9 +21,10 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.mutable +import com.nvidia.spark.rapids.lore.IdGen import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl} - import org.apache.spark.SparkContext + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode @@ -823,6 +824,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { updatedPlan = fixupAdaptiveExchangeReuse(updatedPlan) } + if (rapidsConf.get(RapidsConf.TAG_LORE_ID_ENABLED)) { + updatedPlan = IdGen.tagLoreId(updatedPlan) + } + if (rapidsConf.logQueryTransformations) { logWarning(s"Transformed query:" + s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 5203e926efa..bbb7db473bd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2300,6 +2300,11 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .booleanConf .createWithDefault(false) + val TAG_LORE_ID_ENABLED = conf("spark.rapids.sql.lore.tag.enabled") + .doc("Enable add a lore id to each gpu plan node") + .booleanConf + .createWithDefault(true) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala new file mode 100644 index 00000000000..542bee0d4a7 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala @@ -0,0 +1,39 @@ +package com.nvidia.spark.rapids.lore + +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import java.util.concurrent.atomic.AtomicInteger + +import com.nvidia.spark.rapids.GpuExec + +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} + +object IdGen { + val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[GpuExec]("rapids.gpu.lore.id") + + /** + * Lore id generator. Key is [[SQLExecution.EXECUTION_ID_KEY]]. + */ + private val idGen: ConcurrentMap[String, AtomicInteger] = + new ConcurrentHashMap[String, AtomicInteger]() + + private def nextLoreIdOfSparkPlan(plan: SparkPlan): Int = { + val executionId = plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement() + } + + def tagLoreId(sparkPlan: SparkPlan): SparkPlan = { + sparkPlan.foreachUp { + case g: GpuExec => + val loreId = nextLoreIdOfSparkPlan(g) + g.setTagValue(LORE_ID_TAG, loreId) + case _ => + } + + sparkPlan + } + + def lordIdOf(node: SparkPlan): Option[String] = { + node.getTagValue(LORE_ID_TAG) + } +} From cbc80d4248073b0c33c1ab490ed479f843d00298 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 6 Jun 2024 18:48:32 +0800 Subject: [PATCH 02/17] Introduce lore id --- .../src/main/scala/com/nvidia/spark/rapids/GpuExec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 985260249d7..3a6d831c993 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -374,13 +374,12 @@ trait GpuExec extends SparkPlan { } }.getOrElse(orig) } - + override def nodeName: String = { lordIdOf(this) match { case Some(loreId) => s"${super.nodeName} [loreId=$loreId]" case None => s"${super.nodeName} [loreId=unknown]" } - } } protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] From 2bc2ae5a518d086b5c84eafbecfb13d1e2e0dfec Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 6 Jun 2024 18:52:46 +0800 Subject: [PATCH 03/17] Fix type --- .../src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala index 542bee0d4a7..258226d9039 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} object IdGen { - val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[GpuExec]("rapids.gpu.lore.id") + val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.id") /** * Lore id generator. Key is [[SQLExecution.EXECUTION_ID_KEY]]. @@ -26,7 +26,7 @@ object IdGen { sparkPlan.foreachUp { case g: GpuExec => val loreId = nextLoreIdOfSparkPlan(g) - g.setTagValue(LORE_ID_TAG, loreId) + g.setTagValue(LORE_ID_TAG, loreId.toString) case _ => } From 3705aa49a1896996acb53f2febd9c79fbfbec53e Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 6 Jun 2024 18:54:49 +0800 Subject: [PATCH 04/17] Fix type --- .../com/nvidia/spark/rapids/lore/IdGen.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala index 258226d9039..391a4e88a86 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.nvidia.spark.rapids.lore import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} From 29abddf66f17e19f6cd867f0ccad66b70f2b5c9d Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Tue, 11 Jun 2024 13:11:37 +0800 Subject: [PATCH 05/17] Conf --- .../com/nvidia/spark/rapids/RapidsConf.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index bbb7db473bd..817d0c29699 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2305,6 +2305,23 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .booleanConf .createWithDefault(true) + val LORE_DUMP_IDS = conf("spark.rapids.sql.lore.idsToDump") + .doc("""Specify the lore ids of operators to dump. The format is a comma separated list of + |lore ids. For example: "1,2,3" will dump the gpu exec nodes with lore ids 1, 2, and 3. + |By default, all partitions of operators' input will be dumped. If you want to dump only + |some partitions, you can specify the partition index after the lore id, e.g. 1[0-2 4-5 + |7], 2[0 4 5-8] , will dump partitions 0, 1, 2, 4, 5 and 7 of the operator with lore id + | 1, and partitions 0, 4, 5, 6, 7, 8 of the operator with lore id 2. + |If this is not set, no lore nodes will be dumped.""".stripMargin) + .stringConf + .createWithDefault("") + + val LORE_DUMP_PATH = conf("spark.rapids.sql.lore.dumpPath") + .doc( + s"""The path to dump the lore nodes' input data. This must be set if ${LORE_DUMP_IDS.key} has + |been set.""".stripMargin) + .stringConf + private def printSectionHeader(category: String): Unit = println(s"\n### $category") From b435fdc58994d69ce4944fe9090dbb7c678d31ca Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Tue, 11 Jun 2024 13:22:40 +0800 Subject: [PATCH 06/17] style --- .../scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 49d76b7d822..14c6cf582ec 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -23,8 +23,8 @@ import scala.collection.mutable import com.nvidia.spark.rapids.lore.IdGen import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl} -import org.apache.spark.SparkContext +import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.IdentityBroadcastMode From a2206e79e28da43ef5d4b36046029052a3763e2f Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 12 Jun 2024 10:06:20 +0800 Subject: [PATCH 07/17] part --- .../com/nvidia/spark/rapids/DumpUtils.scala | 13 +++- .../com/nvidia/spark/rapids/lore/IdGen.scala | 29 ++++++-- .../spark/rapids/lore/OutputLoreId.scala | 73 +++++++++++++++++++ .../com/nvidia/spark/rapids/lore/dump.scala | 50 +++++++++++++ .../nvidia/spark/rapids/lore/package.scala | 22 ++++++ .../spark/rapids/OutputLoreIdSuite.scala | 41 +++++++++++ 6 files changed, 218 insertions(+), 10 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala create mode 100644 sql-plugin/src/test/scala/com/nvidia/spark/rapids/OutputLoreIdSuite.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index bf949897c78..cc024561875 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -15,7 +15,7 @@ */ package com.nvidia.spark.rapids -import java.io.{File, FileOutputStream} +import java.io.{File, FileOutputStream, OutputStream} import java.util.Random import scala.collection.mutable @@ -82,6 +82,10 @@ object DumpUtils extends Logging { } } + def dumpToParquet(columnarBatch: ColumnarBatch, outputStream: OutputStream) = { + + } + /** * Debug utility to dump table to parquet file.
* It's running on GPU. Parquet column names are generated from table column type info.
@@ -129,12 +133,15 @@ object DumpUtils extends Logging { } // parquet dumper -class ParquetDumper(path: String, table: Table) extends HostBufferConsumer +class ParquetDumper(private val outputStream: OutputStream, table: Table) extends HostBufferConsumer with AutoCloseable { - private[this] val outputStream = new FileOutputStream(path) private[this] val tempBuffer = new Array[Byte](128 * 1024) private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]() + def this(path: String, table: Table) = { + this(new FileOutputStream(path), table) + } + val tableWriter: TableWriter = { // avoid anything conversion, just dump as it is val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala index 391a4e88a86..999d6c64004 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,15 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.atomic.AtomicInteger import com.nvidia.spark.rapids.GpuExec +import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} object IdGen { val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.id") + val LORE_OUTPUT_PATH_TAG: TreeNodeTag[LoreOutputInfo] = new TreeNodeTag[LoreOutputInfo]( + "rapids.gpu.lore.output.path") /** * Lore id generator. Key is [[SQLExecution.EXECUTION_ID_KEY]]. @@ -33,16 +36,28 @@ object IdGen { private val idGen: ConcurrentMap[String, AtomicInteger] = new ConcurrentHashMap[String, AtomicInteger]() - private def nextLoreIdOfSparkPlan(plan: SparkPlan): Int = { - val executionId = plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement() + private def nextLoreIdOfSparkPlan(plan: SparkPlan): Option[Int] = { + // When the execution id is not set, it means there is no actual execution happening, in this + // case we don't need to generate lore id. + Option(plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) + .map { executionId => + idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement() + } } - def tagLoreId(sparkPlan: SparkPlan): SparkPlan = { + def tagLoreId(sparkPlan: SparkPlan, outputLoreIds: OutputLoreIds, loreOutputRootPath: Path) + : SparkPlan = { sparkPlan.foreachUp { case g: GpuExec => - val loreId = nextLoreIdOfSparkPlan(g) - g.setTagValue(LORE_ID_TAG, loreId.toString) + nextLoreIdOfSparkPlan(g).foreach { id => + g.setTagValue(LORE_ID_TAG, id.toString) + val currentExecRootPath = new Path(loreOutputRootPath, s"loreId=$id") + g.children.zipWithIndex.foreach { + case (child, idx) => + val childOutputPath = new Path(currentExecRootPath, s"child=$idx") + child.setTagValue(LORE_OUTPUT_PATH_TAG, LoreOutputInfo(childOutputPath, outputLoreIds)) + } + } case _ => } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala new file mode 100644 index 00000000000..2c2ce287f89 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.lore + +import org.apache.hadoop.fs.Path + +case class OutputLoreId(loreId: LoreId, partitionIds: Set[Int]) { + def outputAllParitions: Boolean = partitionIds.isEmpty + + def shouldOutputPartition(partitionId: Int): Boolean = outputAllParitions || + partitionIds.contains(partitionId) +} + +case class LoreOutputInfo(outputLoreId: OutputLoreId, path: Path) + +object OutputLoreId { + private val PARTITION_ID_RANGE_REGEX = raw"(\d+)-(\d+)".r("start", "end") + private val PARTITION_ID_REGEX = raw"(\d+)".r("partitionId") + private val PARTITION_IDS_REGEX = raw"($PARTITION_ID_RANGE_REGEX|$PARTITION_ID_REGEX)" + + raw"( +($PARTITION_ID_RANGE_REGEX|$PARTITION_ID_REGEX))*".r + private val PARTITION_ID_SEP_REGEX = raw" +".r + + private val OUTPUT_LORE_ID_SEP_REGEX = ", *".r + private val OUTPUT_LORE_ID_REGEX = + raw"(?\d+)(\[(?$PARTITION_IDS_REGEX)\])?".r + + def apply(loreId: Int): OutputLoreId = OutputLoreId(loreId, Set.empty) + + def apply(inputStr: String): OutputLoreId = { + println("inputStr for output lore id: " + inputStr) + OUTPUT_LORE_ID_REGEX.findFirstMatchIn(inputStr).map { m => + val loreId = m.group("loreId").toInt + val partitionIds: Set[Int] = m.group("partitionIds") match { + case null => Set.empty + case partitionIdsStr => + println("partitionIdsStr: " + partitionIdsStr) + PARTITION_ID_SEP_REGEX.split(partitionIdsStr).flatMap { + case PARTITION_ID_REGEX(partitionId) => + Seq(partitionId.toInt) + case PARTITION_ID_RANGE_REGEX(start, end) => + start.toInt until end.toInt + case partitionIdStr => throw new IllegalArgumentException(s"Invalid partition id: " + + s"$partitionIdStr") + }.toSet + } + OutputLoreId(loreId, partitionIds) + }.getOrElse(throw new IllegalArgumentException(s"Invalid output lore ids: $inputStr")) + } + + def parse(inputStr: String): OutputLoreIds = { + require(inputStr != null, "inputStr should not be null") + + OUTPUT_LORE_ID_SEP_REGEX.split(inputStr).map(OutputLoreId(_)).map { outputLoreId => + outputLoreId.loreId -> outputLoreId + }.toMap + } +} + + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala new file mode 100644 index 00000000000..f3c55653d56 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -0,0 +1,50 @@ +import com.nvidia.spark.rapids.GpuExec +import com.nvidia.spark.rapids.lore.LoreOutputInfo +import org.apache.hadoop.fs.Path + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +case class GpuLoreDumpExec(child: SparkPlan, loreOutputInfo: LoreOutputInfo) extends UnaryExecNode + with GpuExec { + override def output: Seq[Attribute] = child.output + + override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException("GpuLoreDumpExec does not support row mode") + } + + override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = ??? + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + GpuLoreDumpExec(newChild, loreOutputInfo) + + def dumpNextBatch(batch: ColumnarBatch) = { + + } +} + +object GpuLoreDumpExec { + def dumpNextBatch(batchId: Long, batch: ColumnarBatch, rootPath: Path) = { + + } +} + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala new file mode 100644 index 00000000000..6ca048e2f5a --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +package object lore { + type LoreId = Int + type OutputLoreIds = Map[LoreId, OutputLoreId] +} diff --git a/sql-plugin/src/test/scala/com/nvidia/spark/rapids/OutputLoreIdSuite.scala b/sql-plugin/src/test/scala/com/nvidia/spark/rapids/OutputLoreIdSuite.scala new file mode 100644 index 00000000000..69632a1e06e --- /dev/null +++ b/sql-plugin/src/test/scala/com/nvidia/spark/rapids/OutputLoreIdSuite.scala @@ -0,0 +1,41 @@ +package com.nvidia.spark.rapids + +import com.nvidia.spark.rapids.lore.OutputLoreId +import org.scalatest.funsuite.AnyFunSuite + +class OutputLoreIdSuite extends AnyFunSuite { + test("Parse one output lore id") { + val expectedLoreIds = Map(1 -> OutputLoreId(1, Set(1, 2, 4, 8))) + val loreIds = OutputLoreId.parse("1[1 2 4 8]") + + assert(loreIds == expectedLoreIds) + } + + test("Parse multi output lore id") { + val expectedLoreIds = Map( + 1 -> OutputLoreId(1, Set(1, 2, 4, 8)), + 2 -> OutputLoreId(2, Set(1, 4, 5, 6, 7, 8, 100)) + ) + val loreIds = OutputLoreId.parse("1[1 2 4 8], 2[1 4-9 100]") + + assert(loreIds == expectedLoreIds) + } + + test("Parse empty output lore id") { + val expectedLoreIds = Map(1 -> OutputLoreId(1), 2 -> OutputLoreId(2)) + val loreIds = OutputLoreId.parse("1 , 2") + + assert(loreIds == expectedLoreIds) + } + + test("Parse mixed") { + val expectedLoreIds = Map( + 1 -> OutputLoreId(1), + 2 -> OutputLoreId(2, Set(4, 5, 8)), + 3 -> OutputLoreId(3, Set(1, 2, 4, 8)) + ) + val loreIds = OutputLoreId.parse("1, 2[4-6 8] , 3[1 2 4 8]") + + assert(loreIds == expectedLoreIds) + } +} From be8e97b26ee7ef2c42dd6ac322b808e7693ed0a5 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 14 Jun 2024 14:23:55 +0800 Subject: [PATCH 08/17] Dump --- .../com/nvidia/spark/rapids/DumpUtils.scala | 17 +++++++- .../com/nvidia/spark/rapids/lore/dump.scala | 42 ++++++++++++------- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index cc024561875..c3b8c6ba62e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf._ import ai.rapids.cudf.ColumnWriterOptions._ -import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration @@ -82,8 +82,21 @@ object DumpUtils extends Logging { } } + /** + * Dump columnar batch to output stream in parquet format.
+ * + * @param columnarBatch The columnar batch to be dumped, should be GPU columnar batch. It + * should be closed by caller. + * @param outputStream Will be closed after writing. + */ def dumpToParquet(columnarBatch: ColumnarBatch, outputStream: OutputStream) = { - + closeOnExcept(outputStream) { _ => + withResource(GpuColumnVector.from(columnarBatch)) { table => + withResource(new ParquetDumper(outputStream, table)) { dumper => + dumper.writeTable(table) + } + } + } } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala index f3c55653d56..c97e2419deb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -1,12 +1,15 @@ -import com.nvidia.spark.rapids.GpuExec -import com.nvidia.spark.rapids.lore.LoreOutputInfo +package com.nvidia.spark.rapids.lore + +import com.nvidia.spark.rapids.{DumpUtils, GpuExec} import org.apache.hadoop.fs.Path +import org.apache.spark.{OneToOneDependency, Partition, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration /* * Copyright (c) 2024, NVIDIA CORPORATION. @@ -24,7 +27,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * limitations under the License. */ -case class GpuLoreDumpExec(child: SparkPlan, loreOutputInfo: LoreOutputInfo) extends UnaryExecNode +case class GpuLoreDumpExec(child: SparkPlan, loreOutputInfo: LoreOutputInfo, + hadoopConf: SerializableConfiguration) extends UnaryExecNode with GpuExec { override def output: Seq[Attribute] = child.output @@ -32,19 +36,29 @@ case class GpuLoreDumpExec(child: SparkPlan, loreOutputInfo: LoreOutputInfo) ext throw new UnsupportedOperationException("GpuLoreDumpExec does not support row mode") } - override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = ??? + override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { + val input = child.executeColumnar() + new GpuLoreDumpRDD(input, hadoopConf, loreOutputInfo) + } override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = - GpuLoreDumpExec(newChild, loreOutputInfo) - - def dumpNextBatch(batch: ColumnarBatch) = { - - } + GpuLoreDumpExec(newChild, loreOutputInfo, hadoopConf) } -object GpuLoreDumpExec { - def dumpNextBatch(batchId: Long, batch: ColumnarBatch, rootPath: Path) = { +class GpuLoreDumpRDD(input: RDD[ColumnarBatch], hadoopConf: SerializableConfiguration, + loreOutputInfo: LoreOutputInfo) extends RDD[ColumnarBatch](input.context, + Seq(new OneToOneDependency(input))) { + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = input + .compute(split, context).zipWithIndex.map { f => + val (batch, index) = f + val outputPath = columnarBatchPath(split, index) + val outputStream = outputPath.getFileSystem(hadoopConf.value).create(outputPath, false) + DumpUtils.dumpToParquet(batch, outputStream) + batch + } - } -} + override protected def getPartitions: Array[Partition] = input.getPartitions + private def columnarBatchPath(partition: Partition, batchIndex: Int): Path = new Path( + loreOutputInfo.path, s"part-${partition.index}/batch-$batchIndex.parquet") +} \ No newline at end of file From 85eb43391ff14fa5c992f0f17a7f5bf164533298 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 17 Jun 2024 15:45:08 +0800 Subject: [PATCH 09/17] Introduce lore framework --- .../com/nvidia/spark/rapids/DumpUtils.scala | 2 +- .../com/nvidia/spark/rapids/GpuExec.scala | 13 ++- .../nvidia/spark/rapids/GpuOverrides.scala | 2 +- .../spark/rapids/GpuTransitionOverrides.scala | 2 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 3 +- .../nvidia/spark/rapids/lore/GpuLore.scala | 94 +++++++++++++++ .../com/nvidia/spark/rapids/lore/IdGen.scala | 57 ++++++--- .../com/nvidia/spark/rapids/lore/dump.scala | 109 +++++++++++++----- .../com/nvidia/spark/rapids/lore/replay.scala | 93 +++++++++++++++ .../nvidia/spark/rapids/GpuLoreSuite.scala | 25 ++++ 10 files changed, 345 insertions(+), 55 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index c3b8c6ba62e..21d2de6ad68 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -89,7 +89,7 @@ object DumpUtils extends Logging { * should be closed by caller. * @param outputStream Will be closed after writing. */ - def dumpToParquet(columnarBatch: ColumnarBatch, outputStream: OutputStream) = { + def dumpToParquet(columnarBatch: ColumnarBatch, outputStream: OutputStream): Unit = { closeOnExcept(outputStream) { _ => withResource(GpuColumnVector.from(columnarBatch)) { table => withResource(new ParquetDumper(outputStream, table)) { dumper => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 3a6d831c993..2745d6fed9a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -19,8 +19,10 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.filecache.FileCacheConf -import com.nvidia.spark.rapids.lore.IdGen.lordIdOf +import com.nvidia.spark.rapids.lore.GpuLore +import com.nvidia.spark.rapids.lore.IdGen.{lordIdOf, LORE_DUMP_PATH_TAG} import com.nvidia.spark.rapids.shims.SparkShimImpl +import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.rapids.LocationPreservingMapPartitionsRDD @@ -364,6 +366,7 @@ trait GpuExec extends SparkPlan { this.getTagValue(GpuExec.TASK_METRICS_TAG) final override def doExecuteColumnar(): RDD[ColumnarBatch] = { + this.dumpLoreMetaInfo() val orig = internalDoExecuteColumnar() val metrics = getTaskMetrics metrics.map { gpuMetrics => @@ -378,7 +381,13 @@ trait GpuExec extends SparkPlan { override def nodeName: String = { lordIdOf(this) match { case Some(loreId) => s"${super.nodeName} [loreId=$loreId]" - case None => s"${super.nodeName} [loreId=unknown]" + case None => s"${super.nodeName}" + } + } + + private def dumpLoreMetaInfo(): Unit = { + getTagValue(LORE_DUMP_PATH_TAG).foreach { rootPath => + GpuLore.dumpPlan(this, new Path(rootPath)) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 491271e33ea..c3f1348d6b5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -4710,7 +4710,7 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { } val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations) if (conf.get(RapidsConf.TAG_LORE_ID_ENABLED)) { - IdGen.tagLoreId(convertedPlan) + IdGen.tagForLore(convertedPlan, conf) } else { convertedPlan } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 14c6cf582ec..9fa9820b6ab 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -825,7 +825,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { } if (rapidsConf.get(RapidsConf.TAG_LORE_ID_ENABLED)) { - updatedPlan = IdGen.tagLoreId(updatedPlan) + updatedPlan = IdGen.tagForLore(updatedPlan, rapidsConf) } if (rapidsConf.logQueryTransformations) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 817d0c29699..5fe896fe100 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2314,13 +2314,14 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. | 1, and partitions 0, 4, 5, 6, 7, 8 of the operator with lore id 2. |If this is not set, no lore nodes will be dumped.""".stripMargin) .stringConf - .createWithDefault("") + .createOptional val LORE_DUMP_PATH = conf("spark.rapids.sql.lore.dumpPath") .doc( s"""The path to dump the lore nodes' input data. This must be set if ${LORE_DUMP_IDS.key} has |been set.""".stripMargin) .stringConf + .createOptional private def printSectionHeader(category: String): Unit = println(s"\n### $category") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala new file mode 100644 index 00000000000..d92b26fb29e --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.lore + +import scala.reflect.ClassTag + +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.GpuExec +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.DataType + +case class LoreRDDMeta(numPartitions: Int, outputPartitions: Seq[Int]) + +case class LoreRDDPartitionMeta(numBatches: Int, dataType: Seq[DataType]) + +trait GpuLoreRDD { + val rootPath: Path + + def pathOfMeta: Path = new Path(rootPath, "rdd.meta") + + def pathOfPartition(partitionIndex: Int): Path = { + new Path(rootPath, s"partition-$partitionIndex") + } + + def pathOfPartitionMeta(partitionIndex: Int): Path = { + new Path(pathOfPartition(partitionIndex), "partition.meta") + } + + def pathOfBatch(partitionIndex: Int, batchIndex: Int): Path = { + new Path(pathOfPartition(partitionIndex), s"batch-$batchIndex.parquet") + } +} + + +object GpuLore { + def pathOfRootPlanMeta(rootPath: Path): Path = { + new Path(rootPath, "plan.meta") + } + + def dumpPlan[T <: SparkPlan : ClassTag](plan: T, rootPath: Path): Unit = { + dumpObject(plan, pathOfRootPlanMeta(rootPath), plan.session.sparkContext.hadoopConfiguration) + } + + def dumpObject[T: ClassTag](obj: T, path: Path, hadoopConf: Configuration): Unit = { + withResource(path.getFileSystem(hadoopConf)) { fs => + withResource(fs.create(path, false)) { fout => + val serializerStream = SparkEnv.get.serializer.newInstance().serializeStream(fout) + withResource(serializerStream) { ser => + ser.writeObject(obj) + } + } + } + } + + def loadObject[T: ClassTag](path: Path, hadoopConf: Configuration): T = { + withResource(path.getFileSystem(hadoopConf)) { fs => + withResource(fs.open(path)) { fin => + val serializerStream = SparkEnv.get.serializer.newInstance().deserializeStream(fin) + withResource(serializerStream) { ser => + ser.readObject().asInstanceOf[T] + } + } + } + } + + def pathOfChild(rootPath: Path, childIndex: Int): Path = { + new Path(rootPath, s"child-$childIndex") + } + + def restoreGpuExec(rootPath: Path, hadoopConf: Configuration): GpuExec = { + val rootExec = loadObject[GpuExec](pathOfRootPlanMeta(rootPath), hadoopConf) + // Load children + rootExec.withNewChildren(rootExec.children.indices.map(GpuLoreReplayExec(_, rootPath))) + .asInstanceOf[GpuExec] + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala index 999d6c64004..8f0ca027bb1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.lore import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.atomic.AtomicInteger -import com.nvidia.spark.rapids.GpuExec +import com.nvidia.spark.rapids.{GpuExec, RapidsConf} import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.trees.TreeNodeTag @@ -27,8 +27,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} object IdGen { val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.id") - val LORE_OUTPUT_PATH_TAG: TreeNodeTag[LoreOutputInfo] = new TreeNodeTag[LoreOutputInfo]( - "rapids.gpu.lore.output.path") + val LORE_DUMP_PATH_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.dump.path") /** * Lore id generator. Key is [[SQLExecution.EXECUTION_ID_KEY]]. @@ -45,23 +44,47 @@ object IdGen { } } - def tagLoreId(sparkPlan: SparkPlan, outputLoreIds: OutputLoreIds, loreOutputRootPath: Path) + def tagForLore(sparkPlan: SparkPlan, rapidsConf: RapidsConf) : SparkPlan = { - sparkPlan.foreachUp { - case g: GpuExec => - nextLoreIdOfSparkPlan(g).foreach { id => - g.setTagValue(LORE_ID_TAG, id.toString) - val currentExecRootPath = new Path(loreOutputRootPath, s"loreId=$id") - g.children.zipWithIndex.foreach { - case (child, idx) => - val childOutputPath = new Path(currentExecRootPath, s"child=$idx") - child.setTagValue(LORE_OUTPUT_PATH_TAG, LoreOutputInfo(childOutputPath, outputLoreIds)) - } + val loreDumpIds = rapidsConf.get(RapidsConf.LORE_DUMP_IDS).map(OutputLoreId.parse) + + loreDumpIds match { + case Some(dumpIds) => + // We need to dump the output of the output of nodes with the lore id in the dump ids + val loreOutputRootPath = rapidsConf.get(RapidsConf.LORE_DUMP_PATH).getOrElse(throw + new IllegalArgumentException(s"${RapidsConf.LORE_DUMP_PATH.key} must be set " + + s"when ${RapidsConf.LORE_DUMP_IDS.key} is set.")) + + sparkPlan.transformUp { + case g: GpuExec => + nextLoreIdOfSparkPlan(g).map { loreId => + g.setTagValue(LORE_ID_TAG, loreId.toString) + + dumpIds.get(loreId).map { outputLoreIds => + val currentExecRootPath = new Path(loreOutputRootPath, s"loreId=$loreId") + g.setTagValue(LORE_DUMP_PATH_TAG, currentExecRootPath.toString) + val newChildren = g.children.zipWithIndex.map { + case (child, idx) => + GpuLoreDumpExec(idx, child, LoreOutputInfo(outputLoreIds, currentExecRootPath)) + } + + g.withNewChildren(newChildren) + }.getOrElse(g) + }.getOrElse(g) + case p => p + } + case None => + // We don't need to dump the output of the nodes, just tag the lore id + sparkPlan.foreachUp { + case g: GpuExec => + nextLoreIdOfSparkPlan(g).foreach { loreId => + g.setTagValue(LORE_ID_TAG, loreId.toString) + } + case _ => } - case _ => - } - sparkPlan + sparkPlan + } } def lordIdOf(node: SparkPlan): Option[String] = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala index c97e2419deb..5d2271c0dd6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -1,16 +1,3 @@ -package com.nvidia.spark.rapids.lore - -import com.nvidia.spark.rapids.{DumpUtils, GpuExec} -import org.apache.hadoop.fs.Path - -import org.apache.spark.{OneToOneDependency, Partition, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration - /* * Copyright (c) 2024, NVIDIA CORPORATION. * @@ -27,9 +14,22 @@ import org.apache.spark.util.SerializableConfiguration * limitations under the License. */ -case class GpuLoreDumpExec(child: SparkPlan, loreOutputInfo: LoreOutputInfo, - hadoopConf: SerializableConfiguration) extends UnaryExecNode - with GpuExec { +package com.nvidia.spark.rapids.lore + +import com.nvidia.spark.rapids.{DumpUtils, GpuColumnVector, GpuExec} +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration + + +case class GpuLoreDumpExec(idxInParent: Int, child: SparkPlan, loreOutputInfo: LoreOutputInfo) + extends UnaryExecNode with GpuExec { override def output: Seq[Attribute] = child.output override def doExecute(): RDD[InternalRow] = { @@ -38,27 +38,72 @@ case class GpuLoreDumpExec(child: SparkPlan, loreOutputInfo: LoreOutputInfo, override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { val input = child.executeColumnar() - new GpuLoreDumpRDD(input, hadoopConf, loreOutputInfo) + val rdd = new GpuLoreDumpRDD(idxInParent, input, loreOutputInfo) + rdd.saveMeta() + rdd } override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = - GpuLoreDumpExec(newChild, loreOutputInfo, hadoopConf) + GpuLoreDumpExec(idxInParent, newChild, loreOutputInfo) } -class GpuLoreDumpRDD(input: RDD[ColumnarBatch], hadoopConf: SerializableConfiguration, - loreOutputInfo: LoreOutputInfo) extends RDD[ColumnarBatch](input.context, - Seq(new OneToOneDependency(input))) { - override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = input - .compute(split, context).zipWithIndex.map { f => - val (batch, index) = f - val outputPath = columnarBatchPath(split, index) - val outputStream = outputPath.getFileSystem(hadoopConf.value).create(outputPath, false) - DumpUtils.dumpToParquet(batch, outputStream) - batch + +class GpuLoreDumpRDD(idxInParent: Int, input: RDD[ColumnarBatch], loreOutputInfo: LoreOutputInfo) + extends RDD[ColumnarBatch](input) with GpuLoreRDD { + override val rootPath: Path = new Path(loreOutputInfo.path, s"input-$idxInParent") + + private val hadoopConf = new SerializableConfiguration(this.context.hadoopConfiguration) + + def saveMeta(): Unit = { + val meta = LoreRDDMeta(input.getNumPartitions, this.getPartitions.map(_.index)) + GpuLore.dumpObject(meta, pathOfMeta, this.context.hadoopConfiguration) + } + + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + if (loreOutputInfo.outputLoreId.shouldOutputPartition(split.index)) { + val originalIter = input.compute(split, context) + new Iterator[ColumnarBatch] { + var batchIdx: Int = -1 + var nextBatch: Option[ColumnarBatch] = None + + loadNextBatch() + + override def hasNext: Boolean = { + nextBatch.isDefined + } + + override def next(): ColumnarBatch = { + val ret = dumpCurrentBatch() + loadNextBatch() + if (!hasNext) { + // This is the last batch, save the partition meta + val partitionMeta = LoreRDDPartitionMeta(batchIdx+1, GpuColumnVector.extractTypes(ret)) + GpuLore.dumpObject(partitionMeta, pathOfPartitionMeta(split.index), hadoopConf.value) + } + ret + } + + private def dumpCurrentBatch(): ColumnarBatch = { + val outputPath = pathOfBatch(split.index, batchIdx) + val outputStream = outputPath.getFileSystem(hadoopConf.value).create(outputPath, false) + DumpUtils.dumpToParquet(nextBatch.get, outputStream) + nextBatch.get + } + + private def loadNextBatch(): Unit = { + if (originalIter.hasNext) { + nextBatch = Some(originalIter.next()) + batchIdx += 1 + } + } + } + } else { + input.compute(split, context) } + } - override protected def getPartitions: Array[Partition] = input.getPartitions + override protected def getPartitions: Array[Partition] = { + input.partitions + } +} - private def columnarBatchPath(partition: Partition, batchIndex: Int): Path = new Path( - loreOutputInfo.path, s"part-${partition.index}/batch-$batchIndex.parquet") -} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala new file mode 100644 index 00000000000..f163828b789 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.lore + +import ai.rapids.cudf.Table +import com.nvidia.spark.rapids.{GpuColumnVector, GpuExec} +import com.nvidia.spark.rapids.Arm.withResource +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration + +case class GpuLoreReplayExec(idxInParent: Int, parentRootPath: Path) extends LeafExecNode + with GpuExec { + override def output: Seq[Attribute] = Nil + + override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException("LoreReplayExec does not support row mode") + } + + override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { + new GpuLoreReplayRDD(session.sparkContext, GpuLore.pathOfChild(parentRootPath, idxInParent)) + } +} + +class GpuLoreReplayRDD(sc: SparkContext, override val rootPath: Path) + extends RDD[ColumnarBatch](sc, Nil) with GpuLoreRDD { + private val hadoopConf = new SerializableConfiguration(sc.hadoopConfiguration) + private val loreRDDMeta: LoreRDDMeta = GpuLore.loadObject(pathOfMeta, sc.hadoopConfiguration) + + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val partitionPath = pathOfPartition(split.index) + withResource(partitionPath.getFileSystem(hadoopConf.value)) { fs => + if (!fs.exists(partitionPath)) { + Iterator.empty + } else { + val partitionMeta = GpuLore.loadObject[LoreRDDPartitionMeta](partitionPath, + hadoopConf.value) + new Iterator[ColumnarBatch] { + private var batchIdx: Int = 0 + + override def hasNext: Boolean = { + batchIdx < partitionMeta.numBatches + } + + override def next(): ColumnarBatch = { + val batchPath = pathOfBatch(split.index, batchIdx) + val ret = withResource(batchPath.getFileSystem(hadoopConf.value)) { fs => + if (!fs.exists(batchPath)) { + throw new IllegalStateException(s"Batch file $batchPath does not exist") + } + withResource(fs.open(batchPath)) { fin => + val buffer = IOUtils.toByteArray(fin) + withResource(Table.readParquet(buffer)) { restoredTable => + GpuColumnVector.from(restoredTable, partitionMeta.dataType.toArray) + } + } + + } + batchIdx += 1 + ret + } + } + } + } + } + + override protected def getPartitions: Array[Partition] = { + (0 until loreRDDMeta.numPartitions).map(LoreReplayPartition).toArray + } +} + +case class LoreReplayPartition(override val index: Int) extends Partition diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala new file mode 100644 index 00000000000..72b19204b77 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala @@ -0,0 +1,25 @@ +package com.nvidia.spark.rapids + +import com.nvidia.spark.rapids.lore.GpuLore +import org.apache.hadoop.fs.Path + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.functions.sum + +class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir with Logging { + test("Test aggregate") { + withGpuSparkSession { spark => + val df = spark.range(0, 1000) + .selectExpr("id % 100 as key", "id % 10 as value") + .groupBy("key") + .agg(sum("value").as("total")) + .collect() + + val exec = GpuLore.restoreGpuExec(new Path("/tmp/agg"), + spark.sparkContext.hadoopConfiguration) + .executeCollect() + + assert(df === exec) + } + } +} From f0c0481c7132f41e8c253173635f087eb8d43575 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 17 Jun 2024 15:51:19 +0800 Subject: [PATCH 10/17] Add tests. --- .../spark/rapids/GpuAggregateExec.scala | 4 +- .../com/nvidia/spark/rapids/GpuExec.scala | 22 ++- .../nvidia/spark/rapids/GpuOverrides.scala | 4 +- .../spark/rapids/GpuTransitionOverrides.scala | 4 +- .../nvidia/spark/rapids/lore/GpuLore.scala | 167 +++++++++++++++- .../com/nvidia/spark/rapids/lore/IdGen.scala | 93 --------- .../spark/rapids/lore/OutputLoreId.scala | 2 - .../com/nvidia/spark/rapids/lore/dump.scala | 44 ++--- .../com/nvidia/spark/rapids/lore/replay.scala | 13 +- .../nvidia/spark/rapids/GpuLoreSuite.scala | 25 --- .../spark/rapids/lore/GpuLoreSuite.scala | 180 ++++++++++++++++++ .../rapids/lore}/OutputLoreIdSuite.scala | 19 +- 12 files changed, 401 insertions(+), 176 deletions(-) delete mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala delete mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala rename {sql-plugin/src/test/scala/com/nvidia/spark/rapids => tests/src/test/scala/com/nvidia/spark/rapids/lore}/OutputLoreIdSuite.scala (62%) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index c58d9862be1..fb24a0338b0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -1886,9 +1886,9 @@ case class GpuHashAggregateExec( truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) val outputString = truncatedString(output, "[", ", ", "]", maxFields) if (verbose) { - s"GpuHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" + s"$nodeName (keys=$keyString, functions=$functionString, output=$outputString)" } else { - s"GpuHashAggregate(keys=$keyString, functions=$functionString)," + + s"$nodeName (keys=$keyString, functions=$functionString)," + s" filters=${aggregateExpressions.map(_.filter)})" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 2745d6fed9a..945bf54775e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.filecache.FileCacheConf -import com.nvidia.spark.rapids.lore.GpuLore -import com.nvidia.spark.rapids.lore.IdGen.{lordIdOf, LORE_DUMP_PATH_TAG} +import com.nvidia.spark.rapids.lore.{GpuLore, GpuLoreDumpRDD} +import com.nvidia.spark.rapids.lore.GpuLore.{lordIdOf, LORE_DUMP_PATH_TAG, LORE_DUMP_RDD_TAG} import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.hadoop.fs.Path @@ -367,7 +367,7 @@ trait GpuExec extends SparkPlan { final override def doExecuteColumnar(): RDD[ColumnarBatch] = { this.dumpLoreMetaInfo() - val orig = internalDoExecuteColumnar() + val orig = this.dumpLoreRDD(internalDoExecuteColumnar()) val metrics = getTaskMetrics metrics.map { gpuMetrics => // This is ugly, but it reduces the need to change all exec nodes, so we are doing it here @@ -379,10 +379,10 @@ trait GpuExec extends SparkPlan { } override def nodeName: String = { - lordIdOf(this) match { - case Some(loreId) => s"${super.nodeName} [loreId=$loreId]" - case None => s"${super.nodeName}" - } + val loreId = lordIdOf(this) + val lorePath = getTagValue(LORE_DUMP_PATH_TAG) + val loreRDDInfo = getTagValue(LORE_DUMP_RDD_TAG) + s"${super.nodeName} [loreId=$loreId] [lorePath=$lorePath] [loreRDDInfo=$loreRDDInfo]" } private def dumpLoreMetaInfo(): Unit = { @@ -391,5 +391,13 @@ trait GpuExec extends SparkPlan { } } + private def dumpLoreRDD(inner: RDD[ColumnarBatch]): RDD[ColumnarBatch] = { + getTagValue(LORE_DUMP_RDD_TAG).map { info => + val rdd = new GpuLoreDumpRDD(info, inner) + rdd.saveMeta() + rdd + }.getOrElse(inner) + } + protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index c3f1348d6b5..b309f296089 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import ai.rapids.cudf.DType import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF} import com.nvidia.spark.rapids.jni.GpuTimeZoneDB -import com.nvidia.spark.rapids.lore.IdGen +import com.nvidia.spark.rapids.lore.GpuLore import com.nvidia.spark.rapids.shims._ import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuLag, GpuLead, GpuPercentRank, GpuRank, GpuRowNumber, GpuSpecialFrameBoundary, GpuWindowExecMeta, GpuWindowSpecDefinitionMeta} import org.apache.hadoop.fs.Path @@ -4710,7 +4710,7 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { } val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations) if (conf.get(RapidsConf.TAG_LORE_ID_ENABLED)) { - IdGen.tagForLore(convertedPlan, conf) + GpuLore.tagForLore(convertedPlan, conf) } else { convertedPlan } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 9fa9820b6ab..b404b0df104 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.mutable -import com.nvidia.spark.rapids.lore.IdGen +import com.nvidia.spark.rapids.lore.GpuLore import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl} import org.apache.spark.SparkContext @@ -825,7 +825,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { } if (rapidsConf.get(RapidsConf.TAG_LORE_ID_ENABLED)) { - updatedPlan = IdGen.tagForLore(updatedPlan, rapidsConf) + updatedPlan = GpuLore.tagForLore(updatedPlan, rapidsConf) } if (rapidsConf.logQueryTransformations) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index d92b26fb29e..476c833e974 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -16,18 +16,24 @@ package com.nvidia.spark.rapids.lore +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import java.util.concurrent.atomic.AtomicInteger + import scala.reflect.ClassTag +import com.nvidia.spark.rapids.{GpuColumnarToRowExec, GpuExec, GpuFilterExec, RapidsConf} import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.GpuExec import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkEnv -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, SparkPlan, SQLExecution} +import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExec import org.apache.spark.sql.types.DataType -case class LoreRDDMeta(numPartitions: Int, outputPartitions: Seq[Int]) +case class LoreRDDMeta(numPartitions: Int, outputPartitions: Seq[Int], attrs: Seq[Attribute]) case class LoreRDDPartitionMeta(numBatches: Int, dataType: Seq[DataType]) @@ -51,6 +57,22 @@ trait GpuLoreRDD { object GpuLore { + /** + * Lore id of a plan node. + */ + val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.id") + /** + * When a [[GpuExec]] node has this tag, it means that this node is a root node whose meta and + * input should be dumped. + */ + val LORE_DUMP_PATH_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.dump.path") + /** + * When a [[GpuExec]] node has this tag, it means that this node is a child node whose data + * should be dumped. + */ + val LORE_DUMP_RDD_TAG: TreeNodeTag[LoreDumpRDDInfo] = new TreeNodeTag[LoreDumpRDDInfo]( + "rapids.gpu.lore.dump.rdd.info") + def pathOfRootPlanMeta(rootPath: Path): Path = { new Path(rootPath, "plan.meta") } @@ -82,13 +104,146 @@ object GpuLore { } def pathOfChild(rootPath: Path, childIndex: Int): Path = { - new Path(rootPath, s"child-$childIndex") + new Path(rootPath, s"input-$childIndex") } def restoreGpuExec(rootPath: Path, hadoopConf: Configuration): GpuExec = { val rootExec = loadObject[GpuExec](pathOfRootPlanMeta(rootPath), hadoopConf) + // Load children - rootExec.withNewChildren(rootExec.children.indices.map(GpuLoreReplayExec(_, rootPath))) - .asInstanceOf[GpuExec] + val newChildren = rootExec.children.zipWithIndex.map { case (plan, idx) => + val newChild = GpuLoreReplayExec(idx, rootPath) + plan match { + case b: GpuBroadcastExchangeExec => + b.withNewChildren(Seq(newChild)) + case _ => newChild + } + } + + rootExec match { + case b: GpuFilterExec => + val newExpr = restoreSubqueryExpression(1, b.condition, rootPath) + b.makeCopy(Array(newExpr, newChildren.head)).asInstanceOf[GpuExec] + case _ => rootExec.withNewChildren(newChildren) + .asInstanceOf[GpuExec] + } + } + + private def restoreSubqueryExpression(startIdx: Int, expression: Expression, + rootPath: Path): Expression = { + var nextIdx = startIdx + val newExpr = expression.transformUp { + case sub: ExecSubqueryExpression if sub.plan.child.isInstanceOf[GpuExec] => + var newChild: SparkPlan = GpuLoreReplayExec(nextIdx, rootPath) + if (!sub.plan.supportsColumnar) { + newChild = GpuColumnarToRowExec(newChild) + } + val newSubqueryExec = sub.plan.withNewChildren(Seq(newChild)).asInstanceOf[BaseSubqueryExec] + nextIdx += 1 + sub.withNewPlan(newSubqueryExec) + } + newExpr + } + + /** + * Lore id generator. Key is [[SQLExecution.EXECUTION_ID_KEY]]. + */ + private val idGen: ConcurrentMap[String, AtomicInteger] = + new ConcurrentHashMap[String, AtomicInteger]() + + private def nextLoreIdOfSparkPlan(plan: SparkPlan): Option[Int] = { + // When the execution id is not set, it means there is no actual execution happening, in this + // case we don't need to generate lore id. + Option(plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) + .map { executionId => + idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement() + } + } + + def tagForLore(sparkPlan: SparkPlan, rapidsConf: RapidsConf): SparkPlan = { + val loreDumpIds = rapidsConf.get(RapidsConf.LORE_DUMP_IDS).map(OutputLoreId.parse) + + val newPlan = loreDumpIds match { + case Some(dumpIds) => + println(s"Dumping lore output for $dumpIds") + // We need to dump the output of the output of nodes with the lore id in the dump ids + val loreOutputRootPath = rapidsConf.get(RapidsConf.LORE_DUMP_PATH).getOrElse(throw + new IllegalArgumentException(s"${RapidsConf.LORE_DUMP_PATH.key} must be set " + + s"when ${RapidsConf.LORE_DUMP_IDS.key} is set.")) + println(s"Dumping lore output to $loreOutputRootPath") + + sparkPlan.foreachUp { + case g: GpuExec if !g.isInstanceOf[GpuLoreReplayExec] => + nextLoreIdOfSparkPlan(g).foreach { loreId => + g.setTagValue(LORE_ID_TAG, loreId.toString) + + dumpIds.get(loreId).foreach { outputLoreIds => + val currentExecRootPath = new Path(loreOutputRootPath, s"loreId-$loreId") + g.setTagValue(LORE_DUMP_PATH_TAG, currentExecRootPath.toString) + val loreOutputInfo = LoreOutputInfo(outputLoreIds, + currentExecRootPath) + + g.children.zipWithIndex.foreach { + case (child, idx) => + val dumpRDDInfo = LoreDumpRDDInfo(idx, loreOutputInfo, child.output) + child match { + case c: GpuBroadcastExchangeExec => + c.child.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) + case o => o.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) + } + } + + g match { + case f: GpuFilterExec => + tagForSubqueryPlan(1, f.condition, loreOutputInfo) + case _ => + } + } + } + case _ => + } + + sparkPlan + case None => + // We don't need to dump the output of the nodes, just tag the lore id + sparkPlan.foreachUp { + case g: GpuExec => + nextLoreIdOfSparkPlan(g).foreach { loreId => + g.setTagValue(LORE_ID_TAG, loreId.toString) + } + case _ => + } + + sparkPlan + } + + newPlan.foreachUp(node => println(s"${node.verboseString(1000)}")) + newPlan + } + + def lordIdOf(node: SparkPlan): Option[String] = { + node.getTagValue(LORE_ID_TAG) + } + + private def tagForSubqueryPlan(startId: Int, expression: Expression, + loreOutputInfo: LoreOutputInfo): Int = { + var nextPlanId = startId + expression.foreachUp { + case sub: ExecSubqueryExpression => + if (sub.plan.child.isInstanceOf[GpuExec]) { + val dumpRDDInfo = LoreDumpRDDInfo(nextPlanId, loreOutputInfo, sub.plan.child.output) + println(s"Tagging subquery plan with $dumpRDDInfo, ${sub.plan.child.verboseString(1000)}") + sub.plan.child match { + case p: GpuColumnarToRowExec => p.child.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) + case c => c.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) + } + + nextPlanId += 1 + } else { + throw new IllegalArgumentException(s"Subquery plan ${sub.plan} is not a GpuExec") + } + case _ => + } + nextPlanId } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala deleted file mode 100644 index 8f0ca027bb1..00000000000 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/IdGen.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.lore - -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} -import java.util.concurrent.atomic.AtomicInteger - -import com.nvidia.spark.rapids.{GpuExec, RapidsConf} -import org.apache.hadoop.fs.Path - -import org.apache.spark.sql.catalyst.trees.TreeNodeTag -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} - -object IdGen { - val LORE_ID_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.id") - val LORE_DUMP_PATH_TAG: TreeNodeTag[String] = new TreeNodeTag[String]("rapids.gpu.lore.dump.path") - - /** - * Lore id generator. Key is [[SQLExecution.EXECUTION_ID_KEY]]. - */ - private val idGen: ConcurrentMap[String, AtomicInteger] = - new ConcurrentHashMap[String, AtomicInteger]() - - private def nextLoreIdOfSparkPlan(plan: SparkPlan): Option[Int] = { - // When the execution id is not set, it means there is no actual execution happening, in this - // case we don't need to generate lore id. - Option(plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) - .map { executionId => - idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement() - } - } - - def tagForLore(sparkPlan: SparkPlan, rapidsConf: RapidsConf) - : SparkPlan = { - val loreDumpIds = rapidsConf.get(RapidsConf.LORE_DUMP_IDS).map(OutputLoreId.parse) - - loreDumpIds match { - case Some(dumpIds) => - // We need to dump the output of the output of nodes with the lore id in the dump ids - val loreOutputRootPath = rapidsConf.get(RapidsConf.LORE_DUMP_PATH).getOrElse(throw - new IllegalArgumentException(s"${RapidsConf.LORE_DUMP_PATH.key} must be set " + - s"when ${RapidsConf.LORE_DUMP_IDS.key} is set.")) - - sparkPlan.transformUp { - case g: GpuExec => - nextLoreIdOfSparkPlan(g).map { loreId => - g.setTagValue(LORE_ID_TAG, loreId.toString) - - dumpIds.get(loreId).map { outputLoreIds => - val currentExecRootPath = new Path(loreOutputRootPath, s"loreId=$loreId") - g.setTagValue(LORE_DUMP_PATH_TAG, currentExecRootPath.toString) - val newChildren = g.children.zipWithIndex.map { - case (child, idx) => - GpuLoreDumpExec(idx, child, LoreOutputInfo(outputLoreIds, currentExecRootPath)) - } - - g.withNewChildren(newChildren) - }.getOrElse(g) - }.getOrElse(g) - case p => p - } - case None => - // We don't need to dump the output of the nodes, just tag the lore id - sparkPlan.foreachUp { - case g: GpuExec => - nextLoreIdOfSparkPlan(g).foreach { loreId => - g.setTagValue(LORE_ID_TAG, loreId.toString) - } - case _ => - } - - sparkPlan - } - } - - def lordIdOf(node: SparkPlan): Option[String] = { - node.getTagValue(LORE_ID_TAG) - } -} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala index 2c2ce287f89..3d97f2784a3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala @@ -41,13 +41,11 @@ object OutputLoreId { def apply(loreId: Int): OutputLoreId = OutputLoreId(loreId, Set.empty) def apply(inputStr: String): OutputLoreId = { - println("inputStr for output lore id: " + inputStr) OUTPUT_LORE_ID_REGEX.findFirstMatchIn(inputStr).map { m => val loreId = m.group("loreId").toInt val partitionIds: Set[Int] = m.group("partitionIds") match { case null => Set.empty case partitionIdsStr => - println("partitionIdsStr: " + partitionIdsStr) PARTITION_ID_SEP_REGEX.split(partitionIdsStr).flatMap { case PARTITION_ID_REGEX(partitionId) => Seq(partitionId.toInt) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala index 5d2271c0dd6..2e176a4973a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -16,59 +16,41 @@ package com.nvidia.spark.rapids.lore -import com.nvidia.spark.rapids.{DumpUtils, GpuColumnVector, GpuExec} +import com.nvidia.spark.rapids.{DumpUtils, GpuColumnVector} +import com.nvidia.spark.rapids.lore.GpuLore.pathOfChild import org.apache.hadoop.fs.Path import org.apache.spark.{Partition, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration -case class GpuLoreDumpExec(idxInParent: Int, child: SparkPlan, loreOutputInfo: LoreOutputInfo) - extends UnaryExecNode with GpuExec { - override def output: Seq[Attribute] = child.output +case class LoreDumpRDDInfo(idxInParent: Int, loreOutputInfo: LoreOutputInfo, attrs: Seq[Attribute]) - override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException("GpuLoreDumpExec does not support row mode") - } - - override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { - val input = child.executeColumnar() - val rdd = new GpuLoreDumpRDD(idxInParent, input, loreOutputInfo) - rdd.saveMeta() - rdd - } - - override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = - GpuLoreDumpExec(idxInParent, newChild, loreOutputInfo) -} - - -class GpuLoreDumpRDD(idxInParent: Int, input: RDD[ColumnarBatch], loreOutputInfo: LoreOutputInfo) +class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch]) extends RDD[ColumnarBatch](input) with GpuLoreRDD { - override val rootPath: Path = new Path(loreOutputInfo.path, s"input-$idxInParent") + override val rootPath: Path = pathOfChild(info.loreOutputInfo.path, info.idxInParent) private val hadoopConf = new SerializableConfiguration(this.context.hadoopConfiguration) def saveMeta(): Unit = { - val meta = LoreRDDMeta(input.getNumPartitions, this.getPartitions.map(_.index)) + val meta = LoreRDDMeta(input.getNumPartitions, this.getPartitions.map(_.index), info.attrs) GpuLore.dumpObject(meta, pathOfMeta, this.context.hadoopConfiguration) } override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { - if (loreOutputInfo.outputLoreId.shouldOutputPartition(split.index)) { + if (info.loreOutputInfo.outputLoreId.shouldOutputPartition(split.index)) { val originalIter = input.compute(split, context) new Iterator[ColumnarBatch] { var batchIdx: Int = -1 var nextBatch: Option[ColumnarBatch] = None - loadNextBatch() - override def hasNext: Boolean = { + if (batchIdx == -1) { + loadNextBatch() + } nextBatch.isDefined } @@ -77,7 +59,7 @@ class GpuLoreDumpRDD(idxInParent: Int, input: RDD[ColumnarBatch], loreOutputInfo loadNextBatch() if (!hasNext) { // This is the last batch, save the partition meta - val partitionMeta = LoreRDDPartitionMeta(batchIdx+1, GpuColumnVector.extractTypes(ret)) + val partitionMeta = LoreRDDPartitionMeta(batchIdx, GpuColumnVector.extractTypes(ret)) GpuLore.dumpObject(partitionMeta, pathOfPartitionMeta(split.index), hadoopConf.value) } ret @@ -93,8 +75,10 @@ class GpuLoreDumpRDD(idxInParent: Int, input: RDD[ColumnarBatch], loreOutputInfo private def loadNextBatch(): Unit = { if (originalIter.hasNext) { nextBatch = Some(originalIter.next()) - batchIdx += 1 + } else { + nextBatch = None } + batchIdx += 1 } } } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala index f163828b789..d5799349937 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala @@ -32,21 +32,24 @@ import org.apache.spark.util.SerializableConfiguration case class GpuLoreReplayExec(idxInParent: Int, parentRootPath: Path) extends LeafExecNode with GpuExec { - override def output: Seq[Attribute] = Nil + private lazy val rdd = new GpuLoreReplayRDD(session.sparkContext, + GpuLore.pathOfChild(parentRootPath, idxInParent)) + override def output: Seq[Attribute] = rdd.loreRDDMeta.attrs override def doExecute(): RDD[InternalRow] = { throw new UnsupportedOperationException("LoreReplayExec does not support row mode") } override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { - new GpuLoreReplayRDD(session.sparkContext, GpuLore.pathOfChild(parentRootPath, idxInParent)) + rdd } } class GpuLoreReplayRDD(sc: SparkContext, override val rootPath: Path) extends RDD[ColumnarBatch](sc, Nil) with GpuLoreRDD { private val hadoopConf = new SerializableConfiguration(sc.hadoopConfiguration) - private val loreRDDMeta: LoreRDDMeta = GpuLore.loadObject(pathOfMeta, sc.hadoopConfiguration) + private[lore] val loreRDDMeta: LoreRDDMeta = GpuLore.loadObject(pathOfMeta, sc + .hadoopConfiguration) override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { val partitionPath = pathOfPartition(split.index) @@ -54,8 +57,8 @@ class GpuLoreReplayRDD(sc: SparkContext, override val rootPath: Path) if (!fs.exists(partitionPath)) { Iterator.empty } else { - val partitionMeta = GpuLore.loadObject[LoreRDDPartitionMeta](partitionPath, - hadoopConf.value) + val partitionMeta = GpuLore.loadObject[LoreRDDPartitionMeta]( + pathOfPartitionMeta(split.index), hadoopConf.value) new Iterator[ColumnarBatch] { private var batchIdx: Int = 0 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala deleted file mode 100644 index 72b19204b77..00000000000 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuLoreSuite.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.nvidia.spark.rapids - -import com.nvidia.spark.rapids.lore.GpuLore -import org.apache.hadoop.fs.Path - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.functions.sum - -class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir with Logging { - test("Test aggregate") { - withGpuSparkSession { spark => - val df = spark.range(0, 1000) - .selectExpr("id % 100 as key", "id % 10 as value") - .groupBy("key") - .agg(sum("value").as("total")) - .collect() - - val exec = GpuLore.restoreGpuExec(new Path("/tmp/agg"), - spark.sparkContext.hadoopConfiguration) - .executeCollect() - - assert(df === exec) - } - } -} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala new file mode 100644 index 00000000000..556549fbc14 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.lore + +import com.nvidia.spark.rapids.{FunSuiteWithTempDir, GpuColumnarToRowExec, RapidsConf, SparkQueryCompareTestSuite} +import org.apache.hadoop.fs.Path + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.functions +import org.apache.spark.sql.internal.SQLConf + +class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir with Logging { + test("Test aggregate replay") { + withGpuSparkSession{ spark => + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "10") + + val df = spark.range(0, 1000, 1, 100) + .selectExpr("id % 10 as key", "id % 100 as value") + .groupBy("key") + .agg(functions.sum("value").as("total")) + + val res = df.collect().length + + + val restoredRes = GpuColumnarToRowExec(GpuLore.restoreGpuExec( + new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-10"), + spark.sparkContext.hadoopConfiguration)) + .executeCollect() + .length + + assert(res == restoredRes) + } + } + + test("Test broadcast join replay") { + withGpuSparkSession{ spark => + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "32") + + val df1 = spark.range(0, 1000, 1, 10) + .selectExpr("id % 10 as key", "id % 100 as value") + .groupBy("key").agg(functions.sum("value").as("count")) + val df2 = spark.range(0, 1000, 1, 10) + .selectExpr("(id % 10 + 5) as key", "id % 100 as value") + .groupBy("key").agg(functions.sum("value").as("count")) + + val df = df1.join(df2, Seq("key")) + + val res = df.collect().length + + + val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( + new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-32"), + spark.sparkContext.hadoopConfiguration)) + .executeCollect() + .length + + assert(res == resCount) + } + } + + test("Test subquery") { + withGpuSparkSession{ spark => + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "13") + + spark.range(0, 100, 1, 10) + .createTempView("df1") + + spark.range(50, 1000, 1, 10) + .createTempView("df2") + + val df = spark.sql("select * from df1 where id > (select max(id) from df2)") + + val res = df.collect().length + + + val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( + new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-13"), + spark.sparkContext.hadoopConfiguration)) + .executeCollect() + .length + + assert(res == resCount) + } + } + + test("Test no broadcast join replay") { + withGpuSparkSession{ spark => + spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "30") + + val df1 = spark.range(0, 1000, 1, 10) + .selectExpr("id % 10 as key", "id % 100 as value") + .groupBy("key").agg(functions.sum("value").as("count")) + val df2 = spark.range(0, 1000, 1, 10) + .selectExpr("(id % 10 + 5) as key", "id % 100 as value") + .groupBy("key").agg(functions.sum("value").as("count")) + + val df = df1.join(df2, Seq("key")) + + val res = df.collect().length + + + val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( + new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-30"), + spark.sparkContext.hadoopConfiguration)) + .executeCollect() + .length + + assert(res == resCount) + } + } + + test("Test AQE replay") { + withGpuSparkSession{ spark => + spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "77") + + val df1 = spark.range(0, 1000, 1, 10) + .selectExpr("id % 10 as key", "id % 100 as value") + .groupBy("key").agg(functions.sum("value").as("count")) + val df2 = spark.range(0, 1000, 1, 10) + .selectExpr("(id % 10 + 5) as key", "id % 100 as value") + .groupBy("key").agg(functions.sum("value").as("count")) + + val df = df1.join(df2, Seq("key")) + + val res = df.collect().length + + + val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( + new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-77"), + spark.sparkContext.hadoopConfiguration)) + .executeCollect() + .length + + assert(res == resCount) + } + } + + test("Partition only") { + withGpuSparkSession{ spark => + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "3[0 2]") + + val df = spark.range(0, 1000, 1, 100) + .selectExpr("id % 10 as key", "id % 100 as value") + + val res = df.collect().length + println(s"Length of original: $res") + + + val restoredRes = GpuColumnarToRowExec(GpuLore.restoreGpuExec( + new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-3"), + spark.sparkContext.hadoopConfiguration)) + .executeCollect() + .length + + assert(20 == restoredRes) + } + } +} diff --git a/sql-plugin/src/test/scala/com/nvidia/spark/rapids/OutputLoreIdSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/lore/OutputLoreIdSuite.scala similarity index 62% rename from sql-plugin/src/test/scala/com/nvidia/spark/rapids/OutputLoreIdSuite.scala rename to tests/src/test/scala/com/nvidia/spark/rapids/lore/OutputLoreIdSuite.scala index 69632a1e06e..d57f895c950 100644 --- a/sql-plugin/src/test/scala/com/nvidia/spark/rapids/OutputLoreIdSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/lore/OutputLoreIdSuite.scala @@ -1,6 +1,21 @@ -package com.nvidia.spark.rapids +/* + * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.lore -import com.nvidia.spark.rapids.lore.OutputLoreId import org.scalatest.funsuite.AnyFunSuite class OutputLoreIdSuite extends AnyFunSuite { From 6054ab9314e84de53f601cc6b4cd64eb19c35a0e Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 19 Jun 2024 18:01:22 +0800 Subject: [PATCH 11/17] Rename test case Signed-off-by: liurenjie1024 --- .../com/nvidia/spark/rapids/lore/GpuLoreSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala index 556549fbc14..3755b333078 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.functions import org.apache.spark.sql.internal.SQLConf class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir with Logging { - test("Test aggregate replay") { + test("Aggregate") { withGpuSparkSession{ spark => spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "10") @@ -47,7 +47,7 @@ class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir w } } - test("Test broadcast join replay") { + test("Broadcast join") { withGpuSparkSession{ spark => spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "32") @@ -74,7 +74,7 @@ class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir w } } - test("Test subquery") { + test("Subquery") { withGpuSparkSession{ spark => spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "13") @@ -100,7 +100,7 @@ class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir w } } - test("Test no broadcast join replay") { + test("No broadcast join") { withGpuSparkSession{ spark => spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) @@ -128,7 +128,7 @@ class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir w } } - test("Test AQE replay") { + test("AQE") { withGpuSparkSession{ spark => spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) From 64d3e398195453b511a22404f37e4d856811f6d5 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 24 Jun 2024 15:12:11 +0800 Subject: [PATCH 12/17] Fix AQE test --- .../com/nvidia/spark/rapids/GpuExec.scala | 2 +- .../nvidia/spark/rapids/lore/GpuLore.scala | 34 +++-- .../com/nvidia/spark/rapids/lore/dump.scala | 14 +- .../nvidia/spark/rapids/lore/package.scala | 32 ++++ .../execution/GpuBroadcastExchangeExec.scala | 20 ++- .../spark/rapids/lore/GpuLoreSuite.scala | 140 +++++++----------- 6 files changed, 142 insertions(+), 100 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 945bf54775e..47e541f48ea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -391,7 +391,7 @@ trait GpuExec extends SparkPlan { } } - private def dumpLoreRDD(inner: RDD[ColumnarBatch]): RDD[ColumnarBatch] = { + protected def dumpLoreRDD(inner: RDD[ColumnarBatch]): RDD[ColumnarBatch] = { getTagValue(LORE_DUMP_RDD_TAG).map { info => val rdd = new GpuLoreDumpRDD(info, inner) rdd.saveMeta() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index 476c833e974..bc156fe5636 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -23,14 +23,16 @@ import scala.reflect.ClassTag import com.nvidia.spark.rapids.{GpuColumnarToRowExec, GpuExec, GpuFilterExec, RapidsConf} import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.shims.ShimLeafExecNode import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - import org.apache.spark.SparkEnv + import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, SparkPlan, SQLExecution} -import org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExec +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuCustomShuffleReaderExec} import org.apache.spark.sql.types.DataType case class LoreRDDMeta(numPartitions: Int, outputPartitions: Seq[Int], attrs: Seq[Attribute]) @@ -116,6 +118,8 @@ object GpuLore { plan match { case b: GpuBroadcastExchangeExec => b.withNewChildren(Seq(newChild)) + case b: BroadcastQueryStageExec => + b.broadcast.withNewChildren(Seq(newChild)) case _ => newChild } } @@ -151,7 +155,7 @@ object GpuLore { private val idGen: ConcurrentMap[String, AtomicInteger] = new ConcurrentHashMap[String, AtomicInteger]() - private def nextLoreIdOfSparkPlan(plan: SparkPlan): Option[Int] = { + private def nextLoreIdOf(plan: SparkPlan): Option[Int] = { // When the execution id is not set, it means there is no actual execution happening, in this // case we don't need to generate lore id. Option(plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) @@ -165,19 +169,18 @@ object GpuLore { val newPlan = loreDumpIds match { case Some(dumpIds) => - println(s"Dumping lore output for $dumpIds") // We need to dump the output of the output of nodes with the lore id in the dump ids val loreOutputRootPath = rapidsConf.get(RapidsConf.LORE_DUMP_PATH).getOrElse(throw new IllegalArgumentException(s"${RapidsConf.LORE_DUMP_PATH.key} must be set " + s"when ${RapidsConf.LORE_DUMP_IDS.key} is set.")) - println(s"Dumping lore output to $loreOutputRootPath") sparkPlan.foreachUp { - case g: GpuExec if !g.isInstanceOf[GpuLoreReplayExec] => - nextLoreIdOfSparkPlan(g).foreach { loreId => + case g: GpuExec => + nextLoreIdOf(g).foreach { loreId => g.setTagValue(LORE_ID_TAG, loreId.toString) dumpIds.get(loreId).foreach { outputLoreIds => + checkUnsupportedOperator(g) val currentExecRootPath = new Path(loreOutputRootPath, s"loreId-$loreId") g.setTagValue(LORE_DUMP_PATH_TAG, currentExecRootPath.toString) val loreOutputInfo = LoreOutputInfo(outputLoreIds, @@ -187,8 +190,8 @@ object GpuLore { case (child, idx) => val dumpRDDInfo = LoreDumpRDDInfo(idx, loreOutputInfo, child.output) child match { - case c: GpuBroadcastExchangeExec => - c.child.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) + case c: BroadcastQueryStageExec => + c.broadcast.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) case o => o.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) } } @@ -208,7 +211,7 @@ object GpuLore { // We don't need to dump the output of the nodes, just tag the lore id sparkPlan.foreachUp { case g: GpuExec => - nextLoreIdOfSparkPlan(g).foreach { loreId => + nextLoreIdOf(g).foreach { loreId => g.setTagValue(LORE_ID_TAG, loreId.toString) } case _ => @@ -217,7 +220,6 @@ object GpuLore { sparkPlan } - newPlan.foreachUp(node => println(s"${node.verboseString(1000)}")) newPlan } @@ -232,7 +234,6 @@ object GpuLore { case sub: ExecSubqueryExpression => if (sub.plan.child.isInstanceOf[GpuExec]) { val dumpRDDInfo = LoreDumpRDDInfo(nextPlanId, loreOutputInfo, sub.plan.child.output) - println(s"Tagging subquery plan with $dumpRDDInfo, ${sub.plan.child.verboseString(1000)}") sub.plan.child match { case p: GpuColumnarToRowExec => p.child.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) case c => c.setTagValue(LORE_DUMP_RDD_TAG, dumpRDDInfo) @@ -246,4 +247,13 @@ object GpuLore { } nextPlanId } + + private def checkUnsupportedOperator(plan: SparkPlan): Unit = { + if (plan.isInstanceOf[ShimLeafExecNode] || + plan.isInstanceOf[GpuCustomShuffleReaderExec] + ) { + throw new UnsupportedOperationException(s"Currently we don't support dumping input of " + + s"${plan.getClass.getSimpleName} operator.") + } + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala index 2e176a4973a..bcab79a54f2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -17,12 +17,16 @@ package com.nvidia.spark.rapids.lore import com.nvidia.spark.rapids.{DumpUtils, GpuColumnVector} +import com.nvidia.spark.rapids.GpuCoalesceExec.EmptyPartition import com.nvidia.spark.rapids.lore.GpuLore.pathOfChild import org.apache.hadoop.fs.Path -import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.rapids.execution.GpuBroadcastHelper +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -91,3 +95,11 @@ class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch]) } } +class SimpleRDD(_sc: SparkContext, data: Broadcast[Any], schema: StructType) extends + RDD[ColumnarBatch](_sc, Nil) { + override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + Seq(GpuBroadcastHelper.getBroadcastBatch(data, schema)).iterator + } + + override protected def getPartitions: Array[Partition] = Array(EmptyPartition(0)) +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala index 6ca048e2f5a..5fb03454c6b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/package.scala @@ -16,6 +16,38 @@ package com.nvidia.spark.rapids +/** + * Lore framework is used for dumping input data of a gpu executor to disk so that it can be + * replayed in local environment for performance analysis. + *
+ * When [[RapidsConf.TAG_LORE_ID_ENABLED]] is set, during the planning phase we will tag a lore + * id to each gpu operator. Lore id is guaranteed to be unique within a query, and it's supposed + * to be same for operators with same plan. + *
+ * When [[RapidsConf.LORE_DUMP_IDS]] is set, during the execution phase we will dump the input + * data of gpu operators with lore id to disk. The dumped data can be replayed in local + * environment. The dumped data will reside in [[RapidsConf.LORE_DUMP_PATH]], and typically will + * following directory hierarchy: + * {{{ + * loreId-10/ + * input-0/ + * rdd.meta + * partition-0/ + * partition.meta + * batch-0.parquet + * batch-1.parquet + * partition-1/ + * partition.meta + * batch-0.parquet + * + *loreId-15/ + * input-0/ + * rdd.meta + * partition-0/ + * partition.meta + * batch-0.parquet + * }}} + */ package object lore { type LoreId = Int type OutputLoreIds = Map[LoreId, OutputLoreId] diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index 51c6f52d97e..e2ec7d99dbb 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -31,9 +31,11 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.lore.GpuLore.LORE_DUMP_RDD_TAG +import com.nvidia.spark.rapids.lore.{GpuLoreDumpRDD, SimpleRDD} import com.nvidia.spark.rapids.shims.{ShimBroadcastExchangeLike, ShimUnaryExecNode, SparkShimImpl} - import org.apache.spark.SparkException + import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher @@ -486,7 +488,9 @@ abstract class GpuBroadcastExchangeExecBase( throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") } try { - relationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[Broadcast[T]] + val ret = relationFuture.get(timeout, TimeUnit.SECONDS) + doLoreDump(ret) + ret.asInstanceOf[Broadcast[T]] } catch { case ex: TimeoutException => logError(s"Could not execute broadcast in $timeout secs.", ex) @@ -501,6 +505,18 @@ abstract class GpuBroadcastExchangeExecBase( } } + // We have to do this explicitly here rather than similar to the general version one in + // [[GpuExec]] since in adaptive execution, the broadcast value has already been calculated + // before we tag this plan to dump. + private def doLoreDump(result: Broadcast[Any]): Unit = { + val inner = new SimpleRDD(session.sparkContext, result, schema) + getTagValue(LORE_DUMP_RDD_TAG).foreach { info => + val rdd = new GpuLoreDumpRDD(info, inner) + rdd.saveMeta() + rdd.foreach(_.close()) + } + } + override def runtimeStatistics: Statistics = { Statistics( sizeInBytes = metrics("dataSize").value, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala index 3755b333078..ae109746b8e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/lore/GpuLoreSuite.scala @@ -20,139 +20,91 @@ import com.nvidia.spark.rapids.{FunSuiteWithTempDir, GpuColumnarToRowExec, Rapid import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.sql.functions +import org.apache.spark.sql.{functions, DataFrame, SparkSession} import org.apache.spark.sql.internal.SQLConf class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir with Logging { test("Aggregate") { - withGpuSparkSession{ spark => - spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) - spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "10") - - val df = spark.range(0, 1000, 1, 100) + doTestReplay("10") { spark => + spark.range(0, 1000, 1, 100) .selectExpr("id % 10 as key", "id % 100 as value") .groupBy("key") .agg(functions.sum("value").as("total")) - - val res = df.collect().length - - - val restoredRes = GpuColumnarToRowExec(GpuLore.restoreGpuExec( - new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-10"), - spark.sparkContext.hadoopConfiguration)) - .executeCollect() - .length - - assert(res == restoredRes) } } test("Broadcast join") { - withGpuSparkSession{ spark => - spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) - spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "32") - + doTestReplay("32") { spark => val df1 = spark.range(0, 1000, 1, 10) .selectExpr("id % 10 as key", "id % 100 as value") - .groupBy("key").agg(functions.sum("value").as("count")) + .groupBy("key") + .agg(functions.sum("value").as("count")) + val df2 = spark.range(0, 1000, 1, 10) .selectExpr("(id % 10 + 5) as key", "id % 100 as value") - .groupBy("key").agg(functions.sum("value").as("count")) - - val df = df1.join(df2, Seq("key")) - - val res = df.collect().length - - - val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( - new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-32"), - spark.sparkContext.hadoopConfiguration)) - .executeCollect() - .length + .groupBy("key") + .agg(functions.sum("value").as("count")) - assert(res == resCount) + df1.join(df2, Seq("key")) } } test("Subquery") { - withGpuSparkSession{ spark => - spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) - spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "13") - + doTestReplay("13") { spark => spark.range(0, 100, 1, 10) .createTempView("df1") spark.range(50, 1000, 1, 10) .createTempView("df2") - val df = spark.sql("select * from df1 where id > (select max(id) from df2)") - - val res = df.collect().length - - - val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( - new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-13"), - spark.sparkContext.hadoopConfiguration)) - .executeCollect() - .length - - assert(res == resCount) + spark.sql("select * from df1 where id > (select max(id) from df2)") } } test("No broadcast join") { - withGpuSparkSession{ spark => + doTestReplay("30") { spark => spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") - spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) - spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "30") val df1 = spark.range(0, 1000, 1, 10) .selectExpr("id % 10 as key", "id % 100 as value") - .groupBy("key").agg(functions.sum("value").as("count")) + .groupBy("key") + .agg(functions.sum("value").as("count")) + val df2 = spark.range(0, 1000, 1, 10) .selectExpr("(id % 10 + 5) as key", "id % 100 as value") - .groupBy("key").agg(functions.sum("value").as("count")) - - val df = df1.join(df2, Seq("key")) - - val res = df.collect().length - - - val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( - new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-30"), - spark.sparkContext.hadoopConfiguration)) - .executeCollect() - .length + .groupBy("key") + .agg(functions.sum("value").as("count")) - assert(res == resCount) + df1.join(df2, Seq("key")) } } - test("AQE") { - withGpuSparkSession{ spark => + test("AQE broadcast") { + doTestReplay("90") { spark => spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") - spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) - spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, "77") val df1 = spark.range(0, 1000, 1, 10) .selectExpr("id % 10 as key", "id % 100 as value") - .groupBy("key").agg(functions.sum("value").as("count")) + .groupBy("key") + .agg(functions.sum("value").as("count")) + val df2 = spark.range(0, 1000, 1, 10) .selectExpr("(id % 10 + 5) as key", "id % 100 as value") - .groupBy("key").agg(functions.sum("value").as("count")) - - val df = df1.join(df2, Seq("key")) - - val res = df.collect().length + .groupBy("key") + .agg(functions.sum("value").as("count")) + df1.join(df2, Seq("key")) + } + } - val resCount = GpuColumnarToRowExec(GpuLore.restoreGpuExec( - new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-77"), - spark.sparkContext.hadoopConfiguration)) - .executeCollect() - .length + test("AQE Exchange") { + doTestReplay("28") { spark => + spark.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") - assert(res == resCount) + spark.range(0, 1000, 1, 100) + .selectExpr("id % 10 as key", "id % 100 as value") + .groupBy("key") + .agg(functions.sum("value").as("total")) } } @@ -177,4 +129,24 @@ class GpuLoreSuite extends SparkQueryCompareTestSuite with FunSuiteWithTempDir w assert(20 == restoredRes) } } + + private def doTestReplay(loreDumpIds: String)(dfFunc: SparkSession => DataFrame) = { + val loreId = OutputLoreId.parse(loreDumpIds).head._1 + withGpuSparkSession { spark => + spark.conf.set(RapidsConf.LORE_DUMP_PATH.key, TEST_FILES_ROOT.getAbsolutePath) + spark.conf.set(RapidsConf.LORE_DUMP_IDS.key, loreDumpIds) + + val df = dfFunc(spark) + + val expectedLength = df.collect().length + + val restoredResultLength = GpuColumnarToRowExec(GpuLore.restoreGpuExec( + new Path(s"${TEST_FILES_ROOT.getAbsolutePath}/loreId-$loreId"), + spark.sparkContext.hadoopConfiguration)) + .executeCollect() + .length + + assert(expectedLength == restoredResultLength) + } + } } From b4610cd6519e0d059bb78326db236cde69e7156e Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Tue, 25 Jun 2024 17:55:36 +0800 Subject: [PATCH 13/17] Fix style --- docs/additional-functionality/advanced_configs.md | 10 ++++++++++ .../scala/com/nvidia/spark/rapids/lore/GpuLore.scala | 11 +++++++---- .../scala/com/nvidia/spark/rapids/lore/replay.scala | 2 +- .../rapids/execution/GpuBroadcastExchangeExec.scala | 4 ++-- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 941ab4046e6..7ef72a381d0 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -135,6 +135,16 @@ Name | Description | Default Value | Applicable at spark.rapids.sql.json.read.decimal.enabled|When reading a quoted string as a decimal Spark supports reading non-ascii unicode digits, and the RAPIDS Accelerator does not.|true|Runtime spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime +spark.rapids.sql.lore.dumpPath|The path to dump the lore nodes' input data. This must be set if spark.rapids.sql.lore.idsToDump has +been set.|None|Runtime +spark.rapids.sql.lore.idsToDump|Specify the lore ids of operators to dump. The format is a comma separated list of +lore ids. For example: "1,2,3" will dump the gpu exec nodes with lore ids 1, 2, and 3. +By default, all partitions of operators' input will be dumped. If you want to dump only +some partitions, you can specify the partition index after the lore id, e.g. 1[0-2 4-5 +7], 2[0 4 5-8] , will dump partitions 0, 1, 2, 4, 5 and 7 of the operator with lore id + 1, and partitions 0, 4, 5, 6, 7, 8 of the operator with lore id 2. +If this is not set, no lore nodes will be dumped.|None|Runtime +spark.rapids.sql.lore.tag.enabled|Enable add a lore id to each gpu plan node|true|Runtime spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index bc156fe5636..be2c7968189 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -23,11 +23,11 @@ import scala.reflect.ClassTag import com.nvidia.spark.rapids.{GpuColumnarToRowExec, GpuExec, GpuFilterExec, RapidsConf} import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.shims.ShimLeafExecNode +import com.nvidia.spark.rapids.shims.{ShimLeafExecNode, SparkShimImpl} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.SparkEnv +import org.apache.spark.SparkEnv import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, SparkPlan, SQLExecution} @@ -80,7 +80,8 @@ object GpuLore { } def dumpPlan[T <: SparkPlan : ClassTag](plan: T, rootPath: Path): Unit = { - dumpObject(plan, pathOfRootPlanMeta(rootPath), plan.session.sparkContext.hadoopConfiguration) + dumpObject(plan, pathOfRootPlanMeta(rootPath), + SparkShimImpl.sessionFromPlan(plan).sparkContext.hadoopConfiguration) } def dumpObject[T: ClassTag](obj: T, path: Path, hadoopConf: Configuration): Unit = { @@ -158,7 +159,9 @@ object GpuLore { private def nextLoreIdOf(plan: SparkPlan): Option[Int] = { // When the execution id is not set, it means there is no actual execution happening, in this // case we don't need to generate lore id. - Option(plan.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) + Option(SparkShimImpl.sessionFromPlan(plan) + .sparkContext + .getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) .map { executionId => idGen.computeIfAbsent(executionId, _ => new AtomicInteger(0)).getAndIncrement() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala index d5799349937..e58297c4ba6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/replay.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.SerializableConfiguration case class GpuLoreReplayExec(idxInParent: Int, parentRootPath: Path) extends LeafExecNode with GpuExec { - private lazy val rdd = new GpuLoreReplayRDD(session.sparkContext, + private lazy val rdd = new GpuLoreReplayRDD(sparkSession.sparkContext, GpuLore.pathOfChild(parentRootPath, idxInParent)) override def output: Seq[Attribute] = rdd.loreRDDMeta.attrs diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index e2ec7d99dbb..bd30459d63e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -31,11 +31,11 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.lore.GpuLore.LORE_DUMP_RDD_TAG import com.nvidia.spark.rapids.lore.{GpuLoreDumpRDD, SimpleRDD} +import com.nvidia.spark.rapids.lore.GpuLore.LORE_DUMP_RDD_TAG import com.nvidia.spark.rapids.shims.{ShimBroadcastExchangeLike, ShimUnaryExecNode, SparkShimImpl} -import org.apache.spark.SparkException +import org.apache.spark.SparkException import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher From 7730d323cceb4c636a96b09148a50e24c2675eee Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 26 Jun 2024 14:44:14 +0800 Subject: [PATCH 14/17] Use args to display lore info. --- .../nvidia/spark/rapids/GpuAggregateExec.scala | 1 + .../scala/com/nvidia/spark/rapids/GpuExec.scala | 15 +++++++++------ .../com/nvidia/spark/rapids/lore/GpuLore.scala | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index fb24a0338b0..6d79dec447a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -1758,6 +1758,7 @@ case class GpuHashAggregateExec( |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} |${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)} |${ExplainUtils.generateFieldString("Results", resultExpressions)} + |Lore: ${loreArgs.mkString(", ")} |""".stripMargin } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 47e541f48ea..d3eec3f83b2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -20,7 +20,7 @@ import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.filecache.FileCacheConf import com.nvidia.spark.rapids.lore.{GpuLore, GpuLoreDumpRDD} -import com.nvidia.spark.rapids.lore.GpuLore.{lordIdOf, LORE_DUMP_PATH_TAG, LORE_DUMP_RDD_TAG} +import com.nvidia.spark.rapids.lore.GpuLore.{loreIdOf, LORE_DUMP_PATH_TAG, LORE_DUMP_RDD_TAG} import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.hadoop.fs.Path @@ -378,11 +378,14 @@ trait GpuExec extends SparkPlan { }.getOrElse(orig) } - override def nodeName: String = { - val loreId = lordIdOf(this) - val lorePath = getTagValue(LORE_DUMP_PATH_TAG) - val loreRDDInfo = getTagValue(LORE_DUMP_RDD_TAG) - s"${super.nodeName} [loreId=$loreId] [lorePath=$lorePath] [loreRDDInfo=$loreRDDInfo]" + override def stringArgs: Iterator[Any] = super.stringArgs ++ loreArgs + + protected def loreArgs: Iterator[String] = { + val loreIdStr = loreIdOf(this).map(id => s"[loreId=$id]") + val lorePathStr = getTagValue(LORE_DUMP_PATH_TAG).map(path => s"[lorePath=$path]") + val loreRDDInfoStr = getTagValue(LORE_DUMP_RDD_TAG).map(info => s"[loreRDDInfo=$info]") + + List(loreIdStr, lorePathStr, loreRDDInfoStr).flatten.iterator } private def dumpLoreMetaInfo(): Unit = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index be2c7968189..21a26023222 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -226,7 +226,7 @@ object GpuLore { newPlan } - def lordIdOf(node: SparkPlan): Option[String] = { + def loreIdOf(node: SparkPlan): Option[String] = { node.getTagValue(LORE_ID_TAG) } From 0bcf31436410fe0439da0c8aeb0737ac8908adfa Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 26 Jun 2024 15:31:47 +0800 Subject: [PATCH 15/17] Fix build break --- .../apache/spark/sql/execution/datasources/GpuWriteFiles.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala index 7cc94359daa..f1ffcf4df1f 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/execution/datasources/GpuWriteFiles.scala @@ -157,7 +157,7 @@ case class GpuWriteFilesExec( s" mismatch:\n$this") } - override protected def stringArgs: Iterator[Any] = Iterator(child) + override def stringArgs: Iterator[Any] = Iterator(child) } object GpuWriteFiles { From c5831370844c2a1ff942b91fc7c17484942ec5a9 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 27 Jun 2024 13:21:13 +0800 Subject: [PATCH 16/17] Fix path in loreinfo --- .../src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala | 2 +- .../scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index 21a26023222..17f4444db9a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -187,7 +187,7 @@ object GpuLore { val currentExecRootPath = new Path(loreOutputRootPath, s"loreId-$loreId") g.setTagValue(LORE_DUMP_PATH_TAG, currentExecRootPath.toString) val loreOutputInfo = LoreOutputInfo(outputLoreIds, - currentExecRootPath) + currentExecRootPath.toString) g.children.zipWithIndex.foreach { case (child, idx) => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala index 3d97f2784a3..47b7e8d1e63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/OutputLoreId.scala @@ -25,7 +25,9 @@ case class OutputLoreId(loreId: LoreId, partitionIds: Set[Int]) { partitionIds.contains(partitionId) } -case class LoreOutputInfo(outputLoreId: OutputLoreId, path: Path) +case class LoreOutputInfo(outputLoreId: OutputLoreId, pathStr: String) { + def path: Path = new Path(pathStr) +} object OutputLoreId { private val PARTITION_ID_RANGE_REGEX = raw"(\d+)-(\d+)".r("start", "end") From 1b387408e8e79f5007d628c8b5f58d17f445ae7a Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 27 Jun 2024 13:32:43 +0800 Subject: [PATCH 17/17] Remove path --- .../src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala | 2 +- .../src/main/scala/com/nvidia/spark/rapids/lore/dump.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala index 17f4444db9a..dc7705b6a2b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/GpuLore.scala @@ -40,7 +40,7 @@ case class LoreRDDMeta(numPartitions: Int, outputPartitions: Seq[Int], attrs: Se case class LoreRDDPartitionMeta(numBatches: Int, dataType: Seq[DataType]) trait GpuLoreRDD { - val rootPath: Path + def rootPath: Path def pathOfMeta: Path = new Path(rootPath, "rdd.meta") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala index bcab79a54f2..bd3c88ec462 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/lore/dump.scala @@ -35,7 +35,7 @@ case class LoreDumpRDDInfo(idxInParent: Int, loreOutputInfo: LoreOutputInfo, att class GpuLoreDumpRDD(info: LoreDumpRDDInfo, input: RDD[ColumnarBatch]) extends RDD[ColumnarBatch](input) with GpuLoreRDD { - override val rootPath: Path = pathOfChild(info.loreOutputInfo.path, info.idxInParent) + override def rootPath: Path = pathOfChild(info.loreOutputInfo.path, info.idxInParent) private val hadoopConf = new SerializableConfiguration(this.context.hadoopConfiguration)