From b3b6f8075050f020722554eeba341e56a25c477a Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Fri, 13 Dec 2024 18:16:27 +0900 Subject: [PATCH] some refinement --- .../src/main/python/spark_session.py | 2 +- .../hybrid/CoalesceConvertIterator.scala | 135 ++++++++++-------- .../hybrid/HybridFileSourceScanExec.scala | 3 +- .../rapids/hybrid/HybridParquetScanRDD.scala | 39 +---- 4 files changed, 83 insertions(+), 96 deletions(-) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 2dd61a05c1a..3bfaad25941 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -329,4 +329,4 @@ def is_hive_available(): return _spark.conf.get("spark.sql.catalogImplementation") == "hive" def is_hybrid_backend_loaded(): - return _spark.conf.get("spark.rapids.sql.hybrid.load") == "true" + return _spark.conf.get("spark.rapids.sql.hybrid.loadBackend") == "true" diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala index e9481fa775e..45d9008cbc4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala @@ -29,13 +29,18 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} -class CoalesceConvertIterator(veloxIter: Iterator[ColumnarBatch], - targetBatchSizeInBytes: Int, +/** + * The Iterator wrapper of the underlying NativeConverter which produces the coalesced Batch of + * HostColumnVectors. The iterator produces RapidsHostColumn instead of HostColumnVector for + * carrying metadata about [Pinned|Pageable]MemCpy which are displayed as Spark SQL Metrics. + */ +class CoalesceConvertIterator(cpuScanIter: Iterator[ColumnarBatch], + targetBatchSizeInBytes: Long, schema: StructType, metrics: Map[String, GpuMetric]) extends Iterator[Array[RapidsHostColumn]] with Logging { - private var converterImpl: Option[NativeConverter] = None + private var converterImpl: NativeConverter = _ private var srcExhausted = false @@ -43,73 +48,83 @@ class CoalesceConvertIterator(veloxIter: Iterator[ColumnarBatch], "C2COutputSize" -> GpuMetric.unwrap(metrics("C2COutputSize"))) override def hasNext(): Boolean = { - // either converter holds data or upstreaming iterator holds data - val ret = withResource(new NvtxWithMetrics("VeloxC2CHasNext", NvtxColor.WHITE, - metrics("C2CStreamTime"))) { _ => - converterImpl.exists(c => c.isDeckFilled || c.hasProceedingBuilders) || - (!srcExhausted && veloxIter.hasNext) + // isDeckFilled means if there is unconverted source data remained on the deck. + // hasProceedingBuilders means if there exists working target vectors not being flushed yet. + val selfHoldData = Option(converterImpl).exists { c => + c.isDeckFilled || c.hasProceedingBuilders } - if (!ret) { - if (!srcExhausted) { - srcExhausted = true - } - converterImpl.foreach { c => - // VeloxBatchConverter collects the eclipsedTime of C2C_Conversion by itself. - // Here we fetch the final value before closing it. - metrics("C2CTime") += c.eclipsedNanoSecond - // release the native instance when upstreaming iterator has been exhausted - val detailedMetrics = c.close() - val tID = TaskContext.get().taskAttemptId() - logDebug(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics") - converterImpl = None - } + // Check the srcExhausted at first, so as to minimize the potential cost of unnecessary call of + // prev.hasNext + lazy val upstreamHoldData = !srcExhausted && cpuScanIter.hasNext + // Either converter holds data or upstreaming iterator holds data. + if (selfHoldData || upstreamHoldData) { + return true + } + + if (!srcExhausted) { + srcExhausted = true + } + // Close the native Converter and dump column-level metrics for performance inspection. + Option(converterImpl).foreach { c => + // VeloxBatchConverter collects the eclipsedTime of C2C_Conversion by itself. + // Here we fetch the final value before closing it. + metrics("C2CTime") += c.eclipsedNanoSecond + // release the native instance when upstreaming iterator has been exhausted + val detailedMetrics = c.close() + val tID = TaskContext.get().taskAttemptId() + logInfo(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics") + converterImpl = null } - ret + false } override def next(): Array[RapidsHostColumn] = { - val ntvx = new NvtxWithMetrics("VeloxC2CNext", NvtxColor.YELLOW, metrics("C2CStreamTime")) - withResource(ntvx) { _ => - while (true) { - converterImpl.foreach { impl => - val needFlush = if (veloxIter.hasNext) { - // The only condition leading to a nonEmpty deck is targetBuffers are unset after - // the previous flushing - if (impl.isDeckFilled) { - impl.setupTargetVectors() - } - // tryAppendBatch, if failed, the batch will be placed on the deck - metrics("CpuReaderBatches") += 1 - !impl.tryAppendBatch(veloxIter.next()) - } else { - srcExhausted = true - true - } - if (needFlush) { - metrics("CoalescedBatches") += 1 - val rapidsHostBatch = impl.flush() - // It is essential to check and tidy up the deck right after flushing. Because if - // the next call of veloxIter.hasNext will release the batch which the deck holds - // its reference. - if (impl.isDeckFilled) { - impl.setupTargetVectors() - } - return rapidsHostBatch - } + require(!srcExhausted, "Please call hasNext in previous to ensure there are remaining data") + + // Initialize the nativeConverter with the first input batch + if (converterImpl == null) { + converterImpl = NativeConverter( + cpuScanIter.next(), + targetBatchSizeInBytes, + schema, + converterMetrics + ) + } + + // Keeps consuming input batches of cpuScanIter until targetVectors reaches `targetBatchSize` + // or cpuScanIter being exhausted. + while (true) { + val needFlush = if (cpuScanIter.hasNext) { + metrics("CpuReaderBatches") += 1 + // The only condition leading to a nonEmpty deck is targetVectors are unset after + // the previous flushing + if (converterImpl.isDeckFilled) { + converterImpl.setupTargetVectors() } - if (converterImpl.isEmpty) { - val converter = NativeConverter( - veloxIter.next(), - targetBatchSizeInBytes, schema, converterMetrics - ) - converterImpl = Some(converter) + // tryAppendBatch, if failed which indicates the remaining space of targetVectors is NOT + // enough the current input batch, then the batch will be placed on the deck and trigger + // the flush of working targetVectors + !converterImpl.tryAppendBatch(cpuScanIter.next()) + } else { + // If cpuScanIter is exhausted, then flushes targetVectors as the last output item. + srcExhausted = true + true + } + if (needFlush) { + metrics("CoalescedBatches") += 1 + val rapidsHostBatch = converterImpl.flush() + // It is essential to check and tidy up the deck right after flushing. Because if + // the next call of cpuScanIter.hasNext will release the batch which the deck holds + // its reference. + if (converterImpl.isDeckFilled) { + converterImpl.setupTargetVectors() } + return rapidsHostBatch } - - throw new RuntimeException("should NOT reach this line") } - } + throw new RuntimeException("should NOT reach this line") + } } object CoalesceConvertIterator extends Logging { diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala index f7fbb7197d7..e75666fe137 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridFileSourceScanExec.scala @@ -111,7 +111,6 @@ case class HybridFileSourceScanExec(originPlan: FileSourceScanExec "CpuReaderBatches" -> (() => createMetric(MODERATE_LEVEL, "CpuReaderBatches")), "C2COutputSize" -> (() => createSizeMetric(MODERATE_LEVEL, "C2COutputSize")), "C2CTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "C2CTime")), - "C2CStreamTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "C2CStreamTime")), "PageableH2DTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "PageableH2DTime")), "PinnedH2DTime" -> (() => createNanoTimingMetric(MODERATE_LEVEL, "PinnedH2DTime")), "PageableH2DSize" -> (() => createSizeMetric(MODERATE_LEVEL, "PageableH2DSize")), @@ -129,7 +128,7 @@ case class HybridFileSourceScanExec(originPlan: FileSourceScanExec nativeMetrics.foreach { case (key, generator) => mapBuilder += key -> generator() } - // Expose all metrics of the underlying GlutenScanExec + // Expose all metrics of the underlying CpuNativeScanExec nativePlan.metrics.foreach { case (key, metric) => mapBuilder += s"Hybrid_$key" -> GpuMetric.wrap(metric) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala index d3a2ed50886..9a64fb8cf11 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridParquetScanRDD.scala @@ -22,7 +22,6 @@ import org.apache.spark.{InterruptibleIterator, Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -33,22 +32,20 @@ class HybridParquetScanRDD(scanRDD: RDD[ColumnarBatch], metrics: Map[String, GpuMetric], ) extends RDD[InternalRow](scanRDD.sparkContext, Nil) { - private val hybridScanTime = GpuMetric.unwrap(metrics("HybridScanTime")) - override protected def getPartitions: Array[Partition] = scanRDD.partitions override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - // the wrapping Iterator for the underlying VeloxScan task - val metricsIter = new HybridScanMetricsIter(scanRDD.compute(split, context), hybridScanTime) + // the wrapping Iterator for the underlying HybridNativeScan task + val hybridScanIter = scanRDD.compute(split, context) val schema = StructType(outputAttr.map { ar => StructField(ar.name, ar.dataType, ar.nullable) }) - require(coalesceGoal.targetSizeBytes <= Int.MaxValue, - s"targetSizeBytes should be smaller than 2GB, but got ${coalesceGoal.targetSizeBytes}" - ) val hostResultIter = new CoalesceConvertIterator( - metricsIter, coalesceGoal.targetSizeBytes.toInt, schema, metrics + hybridScanIter, + coalesceGoal.targetSizeBytes, + schema, + metrics ) val deviceIter = CoalesceConvertIterator.hostToDevice(hostResultIter, outputAttr, metrics) @@ -56,27 +53,3 @@ class HybridParquetScanRDD(scanRDD: RDD[ColumnarBatch], new InterruptibleIterator(context, deviceIter.asInstanceOf[Iterator[InternalRow]]) } } - -// In terms of CPU parquet reader, both hasNext and next might be time-consuming. So, it is -// necessary to take account of the hasNext time as well. -private class HybridScanMetricsIter(iter: Iterator[ColumnarBatch], - scanTime: SQLMetric - ) extends Iterator[ColumnarBatch] { - override def hasNext: Boolean = { - val start = System.nanoTime() - try { - iter.hasNext - } finally { - scanTime += System.nanoTime() - start - } - } - - override def next(): ColumnarBatch = { - val start = System.nanoTime() - try { - iter.next() - } finally { - scanTime += System.nanoTime() - start - } - } -}