Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lore2 withloreid #27

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ For commonly used configurations and examples of setting options, please refer t

Name | Description | Default Value | Applicable at
-----|-------------|--------------|--------------
<a name="LORE.tagging"></a>spark.rapids.LORE.tagging|Enable tagging a LORE id to each gpu plan node|true|Runtime
<a name="alluxio.automount.enabled"></a>spark.rapids.alluxio.automount.enabled|Enable the feature of auto mounting the cloud storage to Alluxio. It requires the Alluxio master is the same node of Spark driver node. The Alluxio master's host and port will be read from alluxio.master.hostname and alluxio.master.rpc.port(default: 19998) from ALLUXIO_HOME/conf/alluxio-site.properties, then replace a cloud path which matches spark.rapids.alluxio.bucket.regex like "s3://bar/b.csv" to "alluxio://0.1.2.3:19998/bar/b.csv", and the bucket "s3://bar" will be mounted to "/bar" in Alluxio automatically.|false|Runtime
<a name="alluxio.bucket.regex"></a>spark.rapids.alluxio.bucket.regex|A regex to decide which bucket should be auto-mounted to Alluxio. E.g. when setting as "^s3://bucket.*", the bucket which starts with "s3://bucket" will be mounted to Alluxio and the path "s3://bucket-foo/a.csv" will be replaced to "alluxio://0.1.2.3:19998/bucket-foo/a.csv". It's only valid when setting spark.rapids.alluxio.automount.enabled=true. The default value matches all the buckets in "s3://" or "s3a://" scheme.|^s3a{0,1}://.*|Runtime
<a name="alluxio.home"></a>spark.rapids.alluxio.home|The Alluxio installation home path or link to the installation home path. |/opt/alluxio|Startup
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ def do_join(spark):
" or exists (select * from {} as r where l.b < r.b)"
).format(left_table_name, right_table_name))

capture_regexp = r"GpuBroadcastNestedLoopJoin ExistenceJoin\(exists#[0-9]+\),"
capture_regexp = r"GpuBroadcastNestedLoopJoin \[LOREID=[0-9]+\] ExistenceJoin\(exists#[0-9]+\),"
assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, capture_regexp,
conf={"spark.sql.adaptive.enabled": aqeEnabled})

Expand All @@ -957,7 +957,7 @@ def do_join(spark):
" or exists (select * from {} as r where l.b < l.a)"
).format(left_table_name, right_table_name))

capture_regexp = r"GpuBroadcastNestedLoopJoin ExistenceJoin\(exists#[0-9]+\),"
capture_regexp = r"GpuBroadcastNestedLoopJoin \[LOREID=[0-9]+\] ExistenceJoin\(exists#[0-9]+\),"
assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, capture_regexp,
conf={"spark.sql.adaptive.enabled": aqeEnabled})

Expand Down
25 changes: 21 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
package com.nvidia.spark.rapids

import java.io.{File, FileOutputStream}
import java.io.{ByteArrayOutputStream, File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
import java.util.Random

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._
Expand Down Expand Up @@ -99,9 +100,25 @@ object DumpUtils extends Logging {
}
}

def deserializeObject[T: ClassTag](readPath: String): T = {
val fileIn: FileInputStream = new FileInputStream(readPath)
val in: ObjectInputStream = new ObjectInputStream(fileIn)
val ret = in.readObject().asInstanceOf[T]
in.close()
ret
}

def serializeObject(obj: Any): Array[Byte] = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)
oos.close()
bos.toByteArray
}

private def dumpToParquetFileImp(table: Table, filePrefix: String): String = {
val path = genPath(filePrefix)
withResource(new ParquetDumper(path, table)) { dumper =>
withResource(new ParquetDumper(new FileOutputStream(path), table)) { dumper =>
dumper.writeTable(table)
path
}
Expand Down Expand Up @@ -129,9 +146,9 @@ object DumpUtils extends Logging {
}

// parquet dumper
class ParquetDumper(path: String, table: Table) extends HostBufferConsumer
class ParquetDumper(private[this] val outputStream: OutputStream, table: Table)
extends HostBufferConsumer
with AutoCloseable {
private[this] val outputStream = new FileOutputStream(path)
private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]()

Expand Down
131 changes: 124 additions & 7 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@ package com.nvidia.spark.rapids

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.DumpUtils.serializeObject
import com.nvidia.spark.rapids.filecache.FileCacheConf
import com.nvidia.spark.rapids.lore.IdGen.loreIdOf
import com.nvidia.spark.rapids.profiling.ReplayDumpRDD
import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.apache.hadoop.fs.{FSDataOutputStream, Path}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rapids.LocationPreservingMapPartitionsRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.GpuTaskMetrics
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

sealed class MetricsLevel(val num: Integer) extends Serializable {
def >=(other: MetricsLevel): Boolean =
Expand Down Expand Up @@ -116,7 +122,7 @@ object GpuMetric extends Logging {
val DESCRIPTION_FILECACHE_DATA_RANGE_READ_TIME = "cached data read time"

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
case w: WrappedGpuMetric => w.sqlMetric
case i => throw new IllegalArgumentException(s"found unsupported GpuMetric ${i.getClass}")
}

Expand Down Expand Up @@ -212,6 +218,28 @@ object GpuExec {
}

trait GpuExec extends SparkPlan {

// For LORE replay
@transient var loreReplayInputDir: String = null // null is better than None considering ser/der
@transient var loreIsReplayingOperator: Boolean = false
// For LORE dump
@transient var shouldDumpOutput: Boolean = false
@transient var dumpForLOREId: String = ""
@transient lazy val loreDumpOperator: Option[String] = RapidsConf.LORE_DUMP_OPERATOR.get(conf)
@transient lazy val loreDumpLOREIds: String = RapidsConf.LORE_DUMP_LORE_IDS.get(conf)
@transient lazy val loreDumpPartitions: String = RapidsConf.LORE_DUMP_PARTITIONS.get(conf)
@transient lazy val loreDumpPath: String = RapidsConf.LORE_DUMP_PATH.get(conf)

// For LORE DumpedExecReplayer, the spark plan is deserialized from the plan.meta file, so
// some of the transient fields will be null, and we need to workaround this
override protected def sparkContext = SparkSession.getActiveSession.get.sparkContext
override protected def waitForSubqueries(): Unit = synchronized {
// only do it when it's not doing LORE replaying
if (!loreIsReplayingOperator && loreReplayInputDir == null) {
super.waitForSubqueries()
}
}

import GpuMetric._
def sparkSession: SparkSession = {
SparkShimImpl.sessionFromPlan(this)
Expand Down Expand Up @@ -246,9 +274,9 @@ trait GpuExec extends SparkPlan {
*/
def outputBatching: CoalesceGoal = null

private [this] lazy val metricsConf = MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf))
private[this] lazy val metricsConf = MetricsLevel(RapidsConf.METRICS_LEVEL.get(conf))

private [this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = {
private[this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = {
if (level >= metricsConf) {
WrappedGpuMetric(f)
} else {
Expand Down Expand Up @@ -306,7 +334,7 @@ trait GpuExec extends SparkPlan {
lazy val allMetrics: Map[String, GpuMetric] = Map(
NUM_OUTPUT_ROWS -> createMetric(outputRowsLevel, DESCRIPTION_NUM_OUTPUT_ROWS),
NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES)) ++
additionalMetrics
additionalMetrics

def gpuLongMetric(name: String): GpuMetric = allMetrics(name)

Expand Down Expand Up @@ -363,16 +391,105 @@ trait GpuExec extends SparkPlan {
this.getTagValue(GpuExec.TASK_METRICS_TAG)

final override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val orig = internalDoExecuteColumnar()
val hadoopConf = new SerializableConfiguration(sparkSession.sparkContext.hadoopConfiguration)

def getOutputStream(hadoopPath: Path): FSDataOutputStream = {
val fs = hadoopPath.getFileSystem(hadoopConf.value)
fs.create(hadoopPath, true)
}

if (loreReplayInputDir != null) {
return new ReplayDumpRDD(sparkSession, loreReplayInputDir)
}
val className = this.getClass.getSimpleName
val myLoreId = loreIdOf(this).getOrElse("unknown")
if (loreDumpOperator.exists(o => o.equals(className)) ||
loreDumpLOREIds.split(',').contains(myLoreId)
) {
val childAsGpuExec = this.asInstanceOf[UnaryExecNode].child.asInstanceOf[GpuExec]
childAsGpuExec.shouldDumpOutput = true
childAsGpuExec.dumpForLOREId = myLoreId
val childPlanId = childAsGpuExec.id
// dump plan node
val planBytes = serializeObject(this)
val fos = getOutputStream(
new Path(new Path(loreDumpPath), s"lore_id=${myLoreId}_plan_id=${childPlanId}/plan.meta"))
fos.write(planBytes)
fos.close()
}
val shouldDumpOutputToBroadcast = shouldDumpOutput
val dumpForLOREIdToBroadcast = dumpForLOREId
val loreDumpPartitionsToBroadcast = loreDumpPartitions
val loreDumpPathToBroadcast = loreDumpPath

var orig: RDD[ColumnarBatch] = null
orig = internalDoExecuteColumnar()

val planId = id
val metrics = getTaskMetrics

metrics.map { gpuMetrics =>
// This is ugly, but it reduces the need to change all exec nodes, so we are doing it here
LocationPreservingMapPartitionsRDD(orig) { iter =>
gpuMetrics.makeSureRegistered()
iter

var batchId = 0
iter.map(cb => {

val tc = TaskContext.get()
if (shouldDumpOutputToBroadcast &&
loreDumpPartitionsToBroadcast.split(',').map(_.toInt).contains(tc.partitionId())) {

println(s"LORE dump activated, for the operator to dump output: " +
s"className: ${className}, " +
s"stage id: ${tc.stageId()}, " +
s"for LORE id: ${dumpForLOREIdToBroadcast}, " +
s"plan id: ${planId}, " +
s"partition id: ${tc.partitionId()}, " +
s"batch id: ${batchId}, " +
s"batch size: ${cb.numRows()} rows.")

val partitionId = TaskContext.get().partitionId()

{
// dump col types for column batch to remote storage
val cbTypes = GpuColumnVector.extractTypes(cb)
val bytes = serializeObject(cbTypes)
val fos = getOutputStream(
new Path(new Path(loreDumpPathToBroadcast),
s"lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/col_types.meta"))
fos.write(bytes)
fos.close()
}

// dump data for column batch to /tmp dir
withResource(GpuColumnVector.from(cb)) { table =>
val fos = getOutputStream(
new Path(new Path(loreDumpPathToBroadcast),
s"lore_id=${dumpForLOREIdToBroadcast}_plan_id=${planId}/" +
s"partition_id=${partitionId}/" +
s"batch_id=${batchId}/cb_data.parquet"))
// ParquetDumper will close the output stream
withResource(new ParquetDumper(fos, table)) { dumper =>
dumper.writeTable(table)
}
}
}
batchId = batchId + 1
cb
})
}
}.getOrElse(orig)
}

override def nodeName: String = {
loreIdOf(this) match {
case Some(loreId) => s"${super.nodeName} [LOREID=$loreId]"
case None => s"${super.nodeName} [LOREID=unknown]"
}
}

protected def internalDoExecuteColumnar(): RDD[ColumnarBatch]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import ai.rapids.cudf.DType
import com.nvidia.spark.rapids.RapidsConf.{SUPPRESS_PLANNING_FAILURE, TEST_CONF}
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.lore.IdGen
import com.nvidia.spark.rapids.shims._
import com.nvidia.spark.rapids.window.{GpuDenseRank, GpuLag, GpuLead, GpuPercentRank, GpuRank, GpuRowNumber, GpuSpecialFrameBoundary, GpuWindowExecMeta, GpuWindowSpecDefinitionMeta}
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -4708,7 +4709,12 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
}
}
GpuOverrides.doConvertPlan(wrap, conf, optimizations)
val convertedPlan = GpuOverrides.doConvertPlan(wrap, conf, optimizations)
if (conf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
IdGen.tagLoreId(convertedPlan)
} else {
convertedPlan
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.mutable

import com.nvidia.spark.rapids.lore.IdGen
import com.nvidia.spark.rapids.shims.{GpuBatchScanExec, SparkShimImpl}

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -823,6 +824,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
updatedPlan = fixupAdaptiveExchangeReuse(updatedPlan)
}

if (rapidsConf.get(RapidsConf.TAG_LORE_ID_ENABLED)) {
updatedPlan = IdGen.tagLoreId(updatedPlan)
}

if (rapidsConf.logQueryTransformations) {
logWarning(s"Transformed query:" +
s"\nOriginal Plan:\n$plan\nTransformed Plan:\n$updatedPlan")
Expand Down
40 changes: 40 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,40 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValues(Set("DEBUG", "MODERATE", "ESSENTIAL"))
.createWithDefault("MODERATE")

val TAG_LORE_ID_ENABLED = conf("spark.rapids.LORE.tagging")
.doc("Enable tagging a LORE id to each gpu plan node")
.booleanConf
.createWithDefault(true)

val LORE_DUMP_OPERATOR = conf("spark.rapids.LORE.operatorToDump")
.doc("The name of SparkPlan to dump, e.g. GpuHashAggregateExec")
.internal()
.stringConf
.createOptional

val LORE_DUMP_LORE_IDS = conf("spark.rapids.LORE.idsToDump")
.doc("Specify which operator(s) to dump by LORE ID. " +
"Specify multiple partitions by using comma, e.g. \"12,31\"")
.internal()
.stringConf
.createWithDefault("")

val LORE_DUMP_PARTITIONS = conf("spark.rapids.LORE.partitionsToDump")
.doc("Which partition of the operator(the operator relates to a fixed stage, " +
"each stage is divided into many tasks by partition id) to dump. User can " +
"specify multiple partitions by using comma, e.g. \"0,3,5\"." +
"By default it will dump the first partitions.")
.internal()
.stringConf
.createWithDefault("0")

val LORE_DUMP_PATH = conf("spark.rapids.LORE.pathPrefix")
.doc("Specifies a URI path to use when dumping with LORE, the default path is: " +
"file:/tmp/lore/")
.internal()
.stringConf
.createWithDefault("file:/tmp/lore/")

val PROFILE_PATH = conf("spark.rapids.profile.pathPrefix")
.doc("Enables profiling and specifies a URI path to use when writing profile data")
.internal()
Expand Down Expand Up @@ -2488,6 +2522,12 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val metricsLevel: String = get(METRICS_LEVEL)

lazy val loreDumpOperator: Option[String] = get(LORE_DUMP_OPERATOR)

lazy val loreDumpPartitions: String = get(LORE_DUMP_PARTITIONS)

lazy val loreDumpPath: String = get(LORE_DUMP_PATH)

lazy val profilePath: Option[String] = get(PROFILE_PATH)

lazy val profileExecutors: String = get(PROFILE_EXECUTORS)
Expand Down
Loading