Skip to content

Commit

Permalink
some refinement
Browse files Browse the repository at this point in the history
  • Loading branch information
sperlingxx committed Dec 13, 2024
1 parent 519f33c commit b3b6f80
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 96 deletions.
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,4 @@ def is_hive_available():
return _spark.conf.get("spark.sql.catalogImplementation") == "hive"

def is_hybrid_backend_loaded():
return _spark.conf.get("spark.rapids.sql.hybrid.load") == "true"
return _spark.conf.get("spark.rapids.sql.hybrid.loadBackend") == "true"
Original file line number Diff line number Diff line change
Expand Up @@ -29,87 +29,102 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

class CoalesceConvertIterator(veloxIter: Iterator[ColumnarBatch],
targetBatchSizeInBytes: Int,
/**
* The Iterator wrapper of the underlying NativeConverter which produces the coalesced Batch of
* HostColumnVectors. The iterator produces RapidsHostColumn instead of HostColumnVector for
* carrying metadata about [Pinned|Pageable]MemCpy which are displayed as Spark SQL Metrics.
*/
class CoalesceConvertIterator(cpuScanIter: Iterator[ColumnarBatch],
targetBatchSizeInBytes: Long,
schema: StructType,
metrics: Map[String, GpuMetric])
extends Iterator[Array[RapidsHostColumn]] with Logging {

private var converterImpl: Option[NativeConverter] = None
private var converterImpl: NativeConverter = _

private var srcExhausted = false

private val converterMetrics = Map(
"C2COutputSize" -> GpuMetric.unwrap(metrics("C2COutputSize")))

override def hasNext(): Boolean = {
// either converter holds data or upstreaming iterator holds data
val ret = withResource(new NvtxWithMetrics("VeloxC2CHasNext", NvtxColor.WHITE,
metrics("C2CStreamTime"))) { _ =>
converterImpl.exists(c => c.isDeckFilled || c.hasProceedingBuilders) ||
(!srcExhausted && veloxIter.hasNext)
// isDeckFilled means if there is unconverted source data remained on the deck.
// hasProceedingBuilders means if there exists working target vectors not being flushed yet.
val selfHoldData = Option(converterImpl).exists { c =>
c.isDeckFilled || c.hasProceedingBuilders
}
if (!ret) {
if (!srcExhausted) {
srcExhausted = true
}
converterImpl.foreach { c =>
// VeloxBatchConverter collects the eclipsedTime of C2C_Conversion by itself.
// Here we fetch the final value before closing it.
metrics("C2CTime") += c.eclipsedNanoSecond
// release the native instance when upstreaming iterator has been exhausted
val detailedMetrics = c.close()
val tID = TaskContext.get().taskAttemptId()
logDebug(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics")
converterImpl = None
}
// Check the srcExhausted at first, so as to minimize the potential cost of unnecessary call of
// prev.hasNext
lazy val upstreamHoldData = !srcExhausted && cpuScanIter.hasNext
// Either converter holds data or upstreaming iterator holds data.
if (selfHoldData || upstreamHoldData) {
return true
}

if (!srcExhausted) {
srcExhausted = true
}
// Close the native Converter and dump column-level metrics for performance inspection.
Option(converterImpl).foreach { c =>
// VeloxBatchConverter collects the eclipsedTime of C2C_Conversion by itself.
// Here we fetch the final value before closing it.
metrics("C2CTime") += c.eclipsedNanoSecond
// release the native instance when upstreaming iterator has been exhausted
val detailedMetrics = c.close()
val tID = TaskContext.get().taskAttemptId()
logInfo(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics")
converterImpl = null
}
ret
false
}

override def next(): Array[RapidsHostColumn] = {
val ntvx = new NvtxWithMetrics("VeloxC2CNext", NvtxColor.YELLOW, metrics("C2CStreamTime"))
withResource(ntvx) { _ =>
while (true) {
converterImpl.foreach { impl =>
val needFlush = if (veloxIter.hasNext) {
// The only condition leading to a nonEmpty deck is targetBuffers are unset after
// the previous flushing
if (impl.isDeckFilled) {
impl.setupTargetVectors()
}
// tryAppendBatch, if failed, the batch will be placed on the deck
metrics("CpuReaderBatches") += 1
!impl.tryAppendBatch(veloxIter.next())
} else {
srcExhausted = true
true
}
if (needFlush) {
metrics("CoalescedBatches") += 1
val rapidsHostBatch = impl.flush()
// It is essential to check and tidy up the deck right after flushing. Because if
// the next call of veloxIter.hasNext will release the batch which the deck holds
// its reference.
if (impl.isDeckFilled) {
impl.setupTargetVectors()
}
return rapidsHostBatch
}
require(!srcExhausted, "Please call hasNext in previous to ensure there are remaining data")

// Initialize the nativeConverter with the first input batch
if (converterImpl == null) {
converterImpl = NativeConverter(
cpuScanIter.next(),
targetBatchSizeInBytes,
schema,
converterMetrics
)
}

// Keeps consuming input batches of cpuScanIter until targetVectors reaches `targetBatchSize`
// or cpuScanIter being exhausted.
while (true) {
val needFlush = if (cpuScanIter.hasNext) {
metrics("CpuReaderBatches") += 1
// The only condition leading to a nonEmpty deck is targetVectors are unset after
// the previous flushing
if (converterImpl.isDeckFilled) {
converterImpl.setupTargetVectors()
}
if (converterImpl.isEmpty) {
val converter = NativeConverter(
veloxIter.next(),
targetBatchSizeInBytes, schema, converterMetrics
)
converterImpl = Some(converter)
// tryAppendBatch, if failed which indicates the remaining space of targetVectors is NOT
// enough the current input batch, then the batch will be placed on the deck and trigger
// the flush of working targetVectors
!converterImpl.tryAppendBatch(cpuScanIter.next())
} else {
// If cpuScanIter is exhausted, then flushes targetVectors as the last output item.
srcExhausted = true
true
}
if (needFlush) {
metrics("CoalescedBatches") += 1
val rapidsHostBatch = converterImpl.flush()
// It is essential to check and tidy up the deck right after flushing. Because if
// the next call of cpuScanIter.hasNext will release the batch which the deck holds
// its reference.
if (converterImpl.isDeckFilled) {
converterImpl.setupTargetVectors()
}
return rapidsHostBatch
}

throw new RuntimeException("should NOT reach this line")
}
}

throw new RuntimeException("should NOT reach this line")
}
}

object CoalesceConvertIterator extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ case class HybridFileSourceScanExec(originPlan: FileSourceScanExec
"CpuReaderBatches" -> (() => createMetric(MODERATE_LEVEL, "CpuReaderBatches")),
"C2COutputSize" -> (() => createSizeMetric(MODERATE_LEVEL, "C2COutputSize")),
"C2CTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "C2CTime")),
"C2CStreamTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "C2CStreamTime")),
"PageableH2DTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "PageableH2DTime")),
"PinnedH2DTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "PinnedH2DTime")),
"PageableH2DSize" -> (() => createSizeMetric(MODERATE_LEVEL, "PageableH2DSize")),
Expand All @@ -129,7 +128,7 @@ case class HybridFileSourceScanExec(originPlan: FileSourceScanExec
nativeMetrics.foreach { case (key, generator) =>
mapBuilder += key -> generator()
}
// Expose all metrics of the underlying GlutenScanExec
// Expose all metrics of the underlying CpuNativeScanExec
nativePlan.metrics.foreach { case (key, metric) =>
mapBuilder += s"Hybrid_$key" -> GpuMetric.wrap(metric)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.{InterruptibleIterator, Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand All @@ -33,50 +32,24 @@ class HybridParquetScanRDD(scanRDD: RDD[ColumnarBatch],
metrics: Map[String, GpuMetric],
) extends RDD[InternalRow](scanRDD.sparkContext, Nil) {

private val hybridScanTime = GpuMetric.unwrap(metrics("HybridScanTime"))

override protected def getPartitions: Array[Partition] = scanRDD.partitions

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
// the wrapping Iterator for the underlying VeloxScan task
val metricsIter = new HybridScanMetricsIter(scanRDD.compute(split, context), hybridScanTime)
// the wrapping Iterator for the underlying HybridNativeScan task
val hybridScanIter = scanRDD.compute(split, context)

val schema = StructType(outputAttr.map { ar =>
StructField(ar.name, ar.dataType, ar.nullable)
})
require(coalesceGoal.targetSizeBytes <= Int.MaxValue,
s"targetSizeBytes should be smaller than 2GB, but got ${coalesceGoal.targetSizeBytes}"
)
val hostResultIter = new CoalesceConvertIterator(
metricsIter, coalesceGoal.targetSizeBytes.toInt, schema, metrics
hybridScanIter,
coalesceGoal.targetSizeBytes,
schema,
metrics
)
val deviceIter = CoalesceConvertIterator.hostToDevice(hostResultIter, outputAttr, metrics)

// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(context, deviceIter.asInstanceOf[Iterator[InternalRow]])
}
}

// In terms of CPU parquet reader, both hasNext and next might be time-consuming. So, it is
// necessary to take account of the hasNext time as well.
private class HybridScanMetricsIter(iter: Iterator[ColumnarBatch],
scanTime: SQLMetric
) extends Iterator[ColumnarBatch] {
override def hasNext: Boolean = {
val start = System.nanoTime()
try {
iter.hasNext
} finally {
scanTime += System.nanoTime() - start
}
}

override def next(): ColumnarBatch = {
val start = System.nanoTime()
try {
iter.next()
} finally {
scanTime += System.nanoTime() - start
}
}
}

0 comments on commit b3b6f80

Please sign in to comment.