From f545a297873a6ae12d79244f8d3e88dacb674112 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 5 Jan 2024 10:08:46 -0600 Subject: [PATCH 01/14] Add in framework for unbounded to unbounded window agg optimization Signed-off-by: Robert (Bobby) Evans --- .../spark/rapids/SpillableColumnarBatch.scala | 48 +++- ...GpuUnboundedToUnboundedAggWindowExec.scala | 217 ++++++++++++++++++ .../rapids/GpuCoalesceBatchesRetrySuite.scala | 19 +- 3 files changed, 279 insertions(+), 5 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 27c8bac497d..4cb95dcacb0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -37,6 +37,12 @@ trait SpillableColumnarBatch extends AutoCloseable { */ def setSpillPriority(priority: Long): Unit + /** + * Increment the reference count for this batch (if applicable) and + * return this for easy chaining. + */ + def incRefCount(): SpillableColumnarBatch + /** * Get the columnar batch. * @note It is the responsibility of the caller to close the batch. @@ -70,6 +76,9 @@ class JustRowsColumnarBatch(numRows: Int) override val sizeInBytes: Long = 0L override def dataTypes: Array[DataType] = Array.empty + + // There is no off heap data and close is a noop so just return this + override def incRefCount(): SpillableColumnarBatch = this } /** @@ -83,6 +92,7 @@ class SpillableColumnarBatchImpl ( rowCount: Int, sparkTypes: Array[DataType]) extends SpillableColumnarBatch { + private var refCount = 1 override def dataTypes: Array[DataType] = sparkTypes /** @@ -113,12 +123,25 @@ class SpillableColumnarBatchImpl ( } } + override def incRefCount(): SpillableColumnarBatch = { + if (refCount <= 0) { + throw new IllegalStateException("Use after free on SpillableColumnarBatchImpl") + } + refCount += 1 + this + } + /** * Remove the `ColumnarBatch` from the cache. */ override def close(): Unit = { - // closing my reference - handle.close() + refCount -= 1 + if (refCount == 0) { + // closing my reference + handle.close() + } else if (refCount < 0) { + throw new IllegalStateException("Double free on SpillableColumnarBatchImpl") + } } } @@ -135,6 +158,9 @@ class JustRowsHostColumnarBatch(numRows: Int) override val sizeInBytes: Long = 0L override def dataTypes: Array[DataType] = Array.empty + + // There is no off heap data and close is a noop so just return this + override def incRefCount(): SpillableColumnarBatch = this } /** @@ -149,6 +175,7 @@ class SpillableHostColumnarBatchImpl ( sparkTypes: Array[DataType], catalog: RapidsBufferCatalog) extends SpillableColumnarBatch { + private var refCount = 1 override def dataTypes: Array[DataType] = sparkTypes @@ -180,12 +207,25 @@ class SpillableHostColumnarBatchImpl ( } } + override def incRefCount(): SpillableColumnarBatch = { + if (refCount <= 0) { + throw new IllegalStateException("Use after free on SpillableHostColumnarBatchImpl") + } + refCount += 1 + this + } + /** * Remove the `ColumnarBatch` from the cache. */ override def close(): Unit = { - // closing my reference - handle.close() + refCount -= 1 + if (refCount == 0) { + // closing my reference + handle.close() + } else if (refCount < 0) { + throw new IllegalStateException("Double free on SpillableHostColumnarBatchImpl") + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala new file mode 100644 index 00000000000..e18ff6434bc --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.window + +import com.nvidia.spark.rapids.{GpuBindReferences, GpuExpression, GpuMetric, SpillableColumnarBatch, SpillPriorities} +import com.nvidia.spark.rapids.Arm.withResource +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry} +import java.util + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.vectorized.ColumnarBatch + + +// It is not really simple to do a single iterator that can do the splits and retries along with +// The data as needed. Instead we are going to decompose the problem into multiple iterators that +// feed into each other. +// The first pass iterator will take in a batch of data and produce one or more aggregated result +// pairs that include the original input data with them. + +case class AggResult(inputData: SpillableColumnarBatch, + aggResult: SpillableColumnarBatch) extends AutoCloseable { + override def close(): Unit = { + inputData.close() + aggResult.close() + } +} + +// TODO not sure that these are the right args to pass into this, +// Because we have to do a merge later on, we might want to preprocess the +// window ops and pass in agg ops. +class GpuUnboundedToUnboundedAggWindowFirstPassIterator( + input: Iterator[ColumnarBatch], + boundWindowOps: Seq[GpuExpression], + boundPartitionSpec: Seq[GpuExpression], + boundOrderSpec: Seq[SortOrder], + outputTypes: Array[DataType], + opTime: GpuMetric) extends Iterator[AggResult] { + private var subIterator: Option[Iterator[AggResult]] = None + override def hasNext: Boolean = subIterator.exists(_.hasNext) || input.hasNext + + override def next(): AggResult = { + if (!hasNext) { + throw new NoSuchElementException() + } + if (subIterator.exists(_.hasNext)) { + subIterator.map(_.next()).get + } else { + val currIter = withRetry( + SpillableColumnarBatch(input.next(), SpillPriorities.ACTIVE_ON_DECK_PRIORITY), + splitSpillableInHalfByRows) { scb => + withResource(scb.getColumnarBatch()) { cb => + // TODO actually do the agg + throw new IllegalStateException("Do the agg!!!") + } + } + val result = currIter.next() + subIterator = Some(currIter) + result + } + } +} + +// The second pass through the data will take the input data, slice it based off of what is +// known to be complete and what is not known yet. Then combine the aggregations as needed +// This is similar to a merge stage. We are not going to try and combine small slices together +// yet. + +// TODO again not sure that these are the right args to pass into this, +// Because we have to do a merge later on, we might want to preprocess the +// window ops and pass in agg ops. +class GpuUnboundedToUnboundedAggWindowSecondPassIterator( + input: Iterator[AggResult], + boundWindowOps: Seq[GpuExpression], + boundPartitionSpec: Seq[GpuExpression], + boundOrderSpec: Seq[SortOrder], + outputTypes: Array[DataType], + opTime: GpuMetric) extends Iterator[AggResult] { + // input data where we don't know if the results are done yet + private val inputDataPendingCompletion = new util.LinkedList[SpillableColumnarBatch]() + // Agg results where the input keys are not fully complete yet. They will need to be combined + // together before being returned. + // TODO private var aggResultsPendingCompletion = new util.LinkedList[SpillableColumnarBatch]() + + private val inputDataThatIsComplete = new util.LinkedList[SpillableColumnarBatch]() + private var aggResultsThatAreComplete: Option[SpillableColumnarBatch] = None + + override def hasNext: Boolean = (!inputDataThatIsComplete.isEmpty) || + (!inputDataPendingCompletion.isEmpty) || input.hasNext + + override def next(): AggResult = { + if (!hasNext) { + throw new NoSuchElementException() + } + while (inputDataThatIsComplete.isEmpty) { + if (input.hasNext) { + withResource(input.next()) { newData => + throw new IllegalStateException("Actually split the inputs") + // TODO newData should be sliced based off of which rows are known to be completed and + // which are not. Then they should be placed in the appropriate state queues. Please note + // that this cannot be done with a split and retry, but should be done with regular retry + } + } else { + throw new IllegalStateException("Merge aggResultsPendingCompletion") + // TODO There is no more data, so we need to merge the aggResultsPendingCompletion + // into a single SpillableColumnarBatch, and put the result in aggResultsThatAreComplete + // then move all of the batches in inputDataPendingCompletion to inputDataThatIsComplete + // Please note that this cannot be done with a split and retry, but should be done with + // regular retry. + } + } + val nextData = inputDataThatIsComplete.pop + val aggResult = aggResultsThatAreComplete.get + if (inputDataThatIsComplete.isEmpty) { + // Nothing left to work on + aggResultsThatAreComplete = None + AggResult(nextData, aggResult) + } else { + // We are reusing this spillable columnar batch so inc the ref count to avoid it being + // close too early + AggResult(nextData, aggResult.incRefCount()) + } + } +} + +// The final step is to take the original input data along with the agg data, estimate how +// to split/combine the input batches to output batches that are close to the target batch size + +// TODO again not sure that these are the right args to pass into this, +// Because we have to do a merge later on, we might want to preprocess the +// window ops and pass in agg ops. +class GpuUnboundedToUnboundedAggFinalIterator( + input: Iterator[AggResult], + boundWindowOps: Seq[GpuExpression], + boundPartitionSpec: Seq[GpuExpression], + boundOrderSpec: Seq[SortOrder], + outputTypes: Array[DataType], + numOutputBatches: GpuMetric, + numOutputRows: GpuMetric, + opTime: GpuMetric) extends Iterator[ColumnarBatch] { + override def hasNext: Boolean = input.hasNext + + override def next(): ColumnarBatch = { + throw new IllegalStateException("Do all of the work to split this and expand the results") + // TODO also need to make sure that we update the output metrics + } +} + +/** + * An iterator that can do unbounded to unbounded window aggregations as group by aggregations + * followed by an expand/join. + */ +object GpuUnboundedToUnboundedAggWindowIterator { + def apply(input: Iterator[ColumnarBatch], + boundWindowOps: Seq[GpuExpression], + boundPartitionSpec: Seq[GpuExpression], + boundOrderSpec: Seq[SortOrder], + outputTypes: Array[DataType], + numOutputBatches: GpuMetric, + numOutputRows: GpuMetric, + opTime: GpuMetric): Iterator[ColumnarBatch] = { + val firstPass = new GpuUnboundedToUnboundedAggWindowFirstPassIterator(input, boundWindowOps, + boundPartitionSpec, boundOrderSpec, outputTypes, opTime) + val secondPass = new GpuUnboundedToUnboundedAggWindowSecondPassIterator(firstPass, + boundWindowOps, boundPartitionSpec, boundOrderSpec, outputTypes, opTime) + new GpuUnboundedToUnboundedAggFinalIterator(secondPass, boundWindowOps, boundPartitionSpec, + boundOrderSpec, outputTypes, numOutputBatches, numOutputRows, opTime) + } +} + +/** + * This allows for batches of data to be processed without needing them to correspond to + * the partition by boundaries. This is specifically for unbounded to unbounded window + * operations that can be replaced with an aggregation and then expanded out/joined with + * the original input data. + */ +case class GpuUnboundedToUnboundedAggWindowExec( + windowOps: Seq[NamedExpression], + gpuPartitionSpec: Seq[Expression], + gpuOrderSpec: Seq[SortOrder], + child: SparkPlan)( + override val cpuPartitionSpec: Seq[Expression], + override val cpuOrderSpec: Seq[SortOrder]) extends GpuWindowBaseExec { + + override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil + + override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputBatches = gpuLongMetric(GpuMetric.NUM_OUTPUT_BATCHES) + val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS) + val opTime = gpuLongMetric(GpuMetric.OP_TIME) + + val boundWindowOps = GpuBindReferences.bindGpuReferences(windowOps, child.output) + val boundPartitionSpec = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, child.output) + val boundOrderSpec = GpuBindReferences.bindReferences(gpuOrderSpec, child.output) + + child.executeColumnar().mapPartitions { iter => + GpuUnboundedToUnboundedAggWindowIterator(iter, boundWindowOps, boundPartitionSpec, + boundOrderSpec, output.map(_.dataType).toArray, numOutputBatches, numOutputRows, opTime) + } + } +} \ No newline at end of file diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala index 8a99068fe7f..4e1b1c21033 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesRetrySuite.scala @@ -206,6 +206,7 @@ class GpuCoalesceBatchesRetrySuite class SpillableColumnarBatchThatThrows(batch: ColumnarBatch) extends SpillableColumnarBatch { + var refCount = 1 override def numRows(): Int = 0 override def setSpillPriority(priority: Long): Unit = {} override def getColumnarBatch(): ColumnarBatch = { @@ -213,7 +214,23 @@ class GpuCoalesceBatchesRetrySuite } override def sizeInBytes: Long = 0 override def dataTypes: Array[DataType] = Array.empty - override def close(): Unit = batch.close() + override def close(): Unit = { + if (refCount <= 0) { + throw new IllegalStateException("double free") + } + refCount -= 1 + if (refCount == 0) { + batch.close() + } + } + + override def incRefCount(): SpillableColumnarBatch = { + if (refCount <= 0) { + throw new IllegalStateException("Use after free") + } + refCount += 1 + this + } } trait CoalesceIteratorMocks { From 9594f439be17a96b45e16bd811fc396e51962586 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 5 Jan 2024 10:12:30 -0600 Subject: [PATCH 02/14] Fix copyright --- .../scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 4cb95dcacb0..c8a7dbcbb87 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 97eced3cbc79460804fb19e0e0dab037b3197133 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 5 Jan 2024 12:46:17 -0600 Subject: [PATCH 03/14] Update the GpuWindowExecMeta to support UnboundedToUnboundedAgg Signed-off-by: Robert (Bobby) Evans --- .../com/nvidia/spark/rapids/RapidsConf.scala | 12 +- .../rapids/window/GpuWindowExecMeta.scala | 115 +++++++++++------- .../rapids/window/GpuWindowExpression.scala | 8 +- .../rapids/aggregate/aggregateFunctions.scala | 2 +- 4 files changed, 90 insertions(+), 47 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 2f71647971e..0e1faf81578 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -742,6 +742,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(false) + val ENABLE_WINDOW_UNBOUNDED_AGG = conf("spark.rapids.sql.window.unboundedAgg.enabled") + .doc("This is a temporary internal config to turn on an unbounded to unbounded " + + "window optimization that is still a work in progress. It should eventually replace " + + "the double pass window exec.") + .internal() + .booleanConf + .createWithDefault(false) + val ENABLE_FLOAT_AGG = conf("spark.rapids.sql.variableFloatAgg.enabled") .doc("Spark assumes that all operations produce the exact same result each time. " + "This is not true for some floating point aggregations, which can produce slightly " + @@ -2437,6 +2445,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isWindowCollectSetEnabled: Boolean = get(ENABLE_WINDOW_COLLECT_SET) + lazy val isWindowUnboundedAggEnabled: Boolean = get(ENABLE_WINDOW_UNBOUNDED_AGG) + lazy val isFloatAggEnabled: Boolean = get(ENABLE_FLOAT_AGG) lazy val explain: String = get(EXPLAIN) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala index 98d2eaab23d..5ab6546e9c0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala @@ -221,18 +221,22 @@ class GpuWindowExecMeta(windowExec: WindowExec, } case class BatchedOps(running: Seq[NamedExpression], - unboundedToUnbounded: Seq[NamedExpression], + unboundedAgg: Seq[NamedExpression], + unboundedDoublePass: Seq[NamedExpression], bounded: Seq[NamedExpression], passThrough: Seq[NamedExpression]) { def getRunningExpressionsWithPassthrough: Seq[NamedExpression] = passThrough ++ running - def getDoublePassExpressionsWithRunningAsPassthrough: Seq[NamedExpression] = - passThrough ++ unboundedToUnbounded ++ running.map(_.toAttribute) + def getUnboundedAggWithRunningAsPassthrough: Seq[NamedExpression] = + passThrough ++ unboundedAgg ++ running.map(_.toAttribute) + + def getDoublePassExpressionsWithRunningAndUnboundedAggAsPassthrough: Seq[NamedExpression] = + passThrough ++ unboundedDoublePass ++ (unboundedAgg ++ running).map(_.toAttribute) def getBoundedExpressionsWithTheRestAsPassthrough: Seq[NamedExpression] = - passThrough ++ bounded ++ (unboundedToUnbounded ++ running).map(_.toAttribute) + passThrough ++ bounded ++ (unboundedDoublePass ++ unboundedAgg ++ running).map(_.toAttribute) def getMinPrecedingMaxFollowingForBoundedWindows: (Int, Int) = { // All bounded window expressions should have window bound window specs. @@ -260,6 +264,18 @@ case class BatchedOps(running: Seq[NamedExpression], gpuOrderSpec, child)(cpuPartitionSpec, cpuOrderSpec) + private def getUnboundedAggWindowExec( + gpuPartitionSpec: Seq[Expression], + gpuOrderSpec: Seq[SortOrder], + child: SparkPlan, + cpuPartitionSpec: Seq[Expression], + cpuOrderSpec: Seq[SortOrder]): GpuExec = + GpuUnboundedToUnboundedAggWindowExec( + getUnboundedAggWithRunningAsPassthrough, + gpuPartitionSpec, + gpuOrderSpec, + child)(cpuPartitionSpec, cpuOrderSpec) + private def getDoublePassWindowExec( gpuPartitionSpec: Seq[Expression], gpuOrderSpec: Seq[SortOrder], @@ -267,7 +283,7 @@ case class BatchedOps(running: Seq[NamedExpression], cpuPartitionSpec: Seq[Expression], cpuOrderSpec: Seq[SortOrder]): GpuExec = GpuCachedDoublePassWindowExec( - getDoublePassExpressionsWithRunningAsPassthrough, + getDoublePassExpressionsWithRunningAndUnboundedAggAsPassthrough, gpuPartitionSpec, gpuOrderSpec, child)(cpuPartitionSpec, cpuOrderSpec) @@ -290,47 +306,34 @@ case class BatchedOps(running: Seq[NamedExpression], child: SparkPlan, cpuPartitionSpec: Seq[Expression], cpuOrderSpec: Seq[SortOrder]): GpuExec = { - // The order of these matter so we can pass the output of the first through the second one + // The order of these matter so we can match the order of the parameters used to + // create the various aggregation functions + var currentPlan = child if (hasRunning) { - val runningExec = getRunningWindowExec(gpuPartitionSpec, gpuOrderSpec, child, + currentPlan = getRunningWindowExec(gpuPartitionSpec, gpuOrderSpec, currentPlan, + cpuPartitionSpec, cpuOrderSpec) + } + + if (hasUnboundedAgg) { + currentPlan = getUnboundedAggWindowExec(gpuPartitionSpec, gpuOrderSpec, currentPlan, cpuPartitionSpec, cpuOrderSpec) - if (hasDoublePass) { - val doublePassExec = getDoublePassWindowExec(gpuPartitionSpec, gpuOrderSpec, runningExec, - cpuPartitionSpec, cpuOrderSpec) - if (hasBounded) { - getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, doublePassExec, - cpuPartitionSpec, cpuOrderSpec) - } else { - doublePassExec - } - } else { - if (hasBounded) { - getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, runningExec, - cpuPartitionSpec, cpuOrderSpec) - } - else { - runningExec - } - } - } else { - if (hasDoublePass) { - val doublePassExec = getDoublePassWindowExec(gpuPartitionSpec, gpuOrderSpec, child, - cpuPartitionSpec, cpuOrderSpec) - if (hasBounded) { - getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, doublePassExec, - cpuPartitionSpec, cpuOrderSpec) - } else { - doublePassExec - } - } else { - getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, child, - cpuPartitionSpec, cpuOrderSpec) - } } + + if (hasDoublePass) { + currentPlan = getDoublePassWindowExec(gpuPartitionSpec, gpuOrderSpec, currentPlan, + cpuPartitionSpec, cpuOrderSpec) + } + + if (hasBounded) { + getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, currentPlan, + cpuPartitionSpec, cpuOrderSpec) + } + currentPlan.asInstanceOf[GpuExec] } def hasRunning: Boolean = running.nonEmpty - def hasDoublePass: Boolean = unboundedToUnbounded.nonEmpty + def hasUnboundedAgg: Boolean = unboundedAgg.nonEmpty + def hasDoublePass: Boolean = unboundedDoublePass.nonEmpty def hasBounded: Boolean = bounded.nonEmpty } @@ -458,7 +461,26 @@ object GpuWindowExecMeta { isSpecOkay && isFuncOkay } - def isBatchedUnboundedToUnboundedFunc(func: Expression, spec: GpuWindowSpecDefinition): Boolean = + def isUnboundedToUnboundedAggFunc(func: Expression, spec: GpuWindowSpecDefinition, + conf: RapidsConf): Boolean = { + if (!conf.isWindowUnboundedAggEnabled) { + false + } else { + if (!GpuWindowExecMeta.isUnboundedToUnboundedWindow(spec)) { + false + } else { + func match { + case _: GpuUnboundedToUnboundedWindowAgg => true + case GpuAggregateExpression(_: GpuUnboundedToUnboundedWindowAgg, _, _, _, _) => true + case _ => false + } + } + } + } + + // TODO delete this if we can support min, max, and count with + // isUnboundedToUnboundedAggFunc instead. + def isBatchedDoublePassFunc(func: Expression, spec: GpuWindowSpecDefinition): Boolean = func match { case _: GpuUnboundToUnboundWindowWithFixer if GpuWindowExecMeta.isUnboundedToUnboundedWindow(spec) => true @@ -471,7 +493,8 @@ object GpuWindowExecMeta { spec: GpuWindowSpecDefinition, conf: RapidsConf): Boolean = { isBatchedRunningFunc(func, spec) || - isBatchedUnboundedToUnboundedFunc(func, spec) || + isUnboundedToUnboundedAggFunc(func, spec, conf) || + isBatchedDoublePassFunc(func, spec) || isBoundedRowsWindowAndBatchable(spec, conf) } @@ -479,13 +502,16 @@ object GpuWindowExecMeta { conf: RapidsConf): BatchedOps = { val running = ArrayBuffer[NamedExpression]() val doublePass = ArrayBuffer[NamedExpression]() + val unboundedToUnboundedAgg = ArrayBuffer[NamedExpression]() val batchedBounded = ArrayBuffer[NamedExpression]() val passThrough = ArrayBuffer[NamedExpression]() windowOps.foreach { case expr@GpuAlias(GpuWindowExpression(func, spec), _) => if (isBatchedRunningFunc(func, spec)) { running.append(expr) - } else if (isBatchedUnboundedToUnboundedFunc(func, spec)) { + } else if (isUnboundedToUnboundedAggFunc(func, spec, conf)) { + unboundedToUnboundedAgg.append(expr) + } else if (isBatchedDoublePassFunc(func, spec)) { doublePass.append(expr) } else if (isBoundedRowsWindowAndBatchable(spec, conf)) { batchedBounded.append(expr) @@ -501,7 +527,8 @@ object GpuWindowExecMeta { throw new IllegalArgumentException( s"Found unexpected expression $other in window exec ${other.getClass}") } - BatchedOps(running.toSeq, doublePass.toSeq, batchedBounded.toSeq, passThrough.toSeq) + BatchedOps(running.toSeq, unboundedToUnboundedAgg.toSeq, doublePass.toSeq, + batchedBounded.toSeq, passThrough.toSeq) } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala index 0a8c39b0699..ae243144278 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExpression.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Average, CollectList, CollectSet, Count, Max, Min, Sum} import org.apache.spark.sql.rapids.{AddOverflowChecks, GpuCreateNamedStruct, GpuDivide, GpuSubtract} -import org.apache.spark.sql.rapids.aggregate.{GpuAggregateExpression, GpuCount} +import org.apache.spark.sql.rapids.aggregate.{GpuAggregateExpression, GpuAggregateFunction, GpuCount} import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -977,6 +977,12 @@ trait GpuUnboundToUnboundWindowWithFixer { def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer } +/** + * This is used to tag a GpuAggregateFunction that it has been tested to work properly + * with `GpuUnboundedToUnboundedAggWindowExec`. + */ +trait GpuUnboundedToUnboundedWindowAgg extends GpuAggregateFunction + /** * Fixes up a count operation for unbounded preceding to unbounded following * @param errorOnOverflow if we need to throw an exception when an overflow happens or not. diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index a46d66d4f5c..2a0c2176109 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -1758,7 +1758,7 @@ case class GpuCollectSet( child: Expression, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) - extends GpuCollectBase { + extends GpuCollectBase with GpuUnboundedToUnboundedWindowAgg { override lazy val updateAggregates: Seq[CudfAggregate] = Seq(new CudfCollectSet(dataType)) override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(new CudfMergeSets(dataType)) From 48739373f9a7a38c01d654c11eb1eaddc4690b1a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 5 Jan 2024 14:15:52 -0600 Subject: [PATCH 04/14] Fix some bugs Signed-off-by: Robert (Bobby) Evans --- .../com/nvidia/spark/rapids/SpillableColumnarBatch.scala | 7 +++++-- .../com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index c8a7dbcbb87..4894fc82527 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -139,9 +139,12 @@ class SpillableColumnarBatchImpl ( if (refCount == 0) { // closing my reference handle.close() - } else if (refCount < 0) { - throw new IllegalStateException("Double free on SpillableColumnarBatchImpl") } + // TODO this is causing problems so we need to look into this + // https://github.com/NVIDIA/spark-rapids/issues/10161 +// else if (refCount < 0) { +// throw new IllegalStateException("Double free on SpillableColumnarBatchImpl") +// } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala index 5ab6546e9c0..77e8ae679d3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala @@ -325,7 +325,7 @@ case class BatchedOps(running: Seq[NamedExpression], } if (hasBounded) { - getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, currentPlan, + currentPlan = getBatchedBoundedWindowExec(gpuPartitionSpec, gpuOrderSpec, currentPlan, cpuPartitionSpec, cpuOrderSpec) } currentPlan.asInstanceOf[GpuExec] From f825ed813b4b2093d2786ca3d6ad00370bc677eb Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 9 Jan 2024 14:03:34 -0600 Subject: [PATCH 05/14] Add in basic repeat code Signed-off-by: Robert (Bobby) Evans --- ...GpuUnboundedToUnboundedAggWindowExec.scala | 228 ++++++++++++++---- 1 file changed, 185 insertions(+), 43 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index e18ff6434bc..58f51e05ae1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -16,15 +16,15 @@ package com.nvidia.spark.rapids.window -import com.nvidia.spark.rapids.{GpuBindReferences, GpuExpression, GpuMetric, SpillableColumnarBatch, SpillPriorities} -import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry} +import com.nvidia.spark.rapids.{GpuAlias, GpuBindReferences, GpuColumnVector, GpuExpression, GpuLiteral, GpuMetric, GpuProjectExec, SpillableColumnarBatch, SpillPriorities} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry, withRetryNoSplit} import java.util import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.rapids.aggregate.{GpuAggregateExpression, GpuCount} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -42,15 +42,9 @@ case class AggResult(inputData: SpillableColumnarBatch, } } -// TODO not sure that these are the right args to pass into this, -// Because we have to do a merge later on, we might want to preprocess the -// window ops and pass in agg ops. class GpuUnboundedToUnboundedAggWindowFirstPassIterator( input: Iterator[ColumnarBatch], - boundWindowOps: Seq[GpuExpression], - boundPartitionSpec: Seq[GpuExpression], - boundOrderSpec: Seq[SortOrder], - outputTypes: Array[DataType], + boundStages: GpuUnboundedToUnboundedAggStages, opTime: GpuMetric) extends Iterator[AggResult] { private var subIterator: Option[Iterator[AggResult]] = None override def hasNext: Boolean = subIterator.exists(_.hasNext) || input.hasNext @@ -81,16 +75,9 @@ class GpuUnboundedToUnboundedAggWindowFirstPassIterator( // known to be complete and what is not known yet. Then combine the aggregations as needed // This is similar to a merge stage. We are not going to try and combine small slices together // yet. - -// TODO again not sure that these are the right args to pass into this, -// Because we have to do a merge later on, we might want to preprocess the -// window ops and pass in agg ops. class GpuUnboundedToUnboundedAggWindowSecondPassIterator( input: Iterator[AggResult], - boundWindowOps: Seq[GpuExpression], - boundPartitionSpec: Seq[GpuExpression], - boundOrderSpec: Seq[SortOrder], - outputTypes: Array[DataType], + boundStages: GpuUnboundedToUnboundedAggStages, opTime: GpuMetric) extends Iterator[AggResult] { // input data where we don't know if the results are done yet private val inputDataPendingCompletion = new util.LinkedList[SpillableColumnarBatch]() @@ -142,45 +129,195 @@ class GpuUnboundedToUnboundedAggWindowSecondPassIterator( // The final step is to take the original input data along with the agg data, estimate how // to split/combine the input batches to output batches that are close to the target batch size -// TODO again not sure that these are the right args to pass into this, -// Because we have to do a merge later on, we might want to preprocess the -// window ops and pass in agg ops. class GpuUnboundedToUnboundedAggFinalIterator( input: Iterator[AggResult], - boundWindowOps: Seq[GpuExpression], - boundPartitionSpec: Seq[GpuExpression], - boundOrderSpec: Seq[SortOrder], - outputTypes: Array[DataType], + boundStages: GpuUnboundedToUnboundedAggStages, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric) extends Iterator[ColumnarBatch] { override def hasNext: Boolean = input.hasNext override def next(): ColumnarBatch = { - throw new IllegalStateException("Do all of the work to split this and expand the results") - // TODO also need to make sure that we update the output metrics + // TODO we need to add in the retry code, and pre-splitting of the data if possible, but + // for now we are just going to try it. + val (aggResult, rideAlong) = withResource(input.next()) { data => + (data.aggResult.incRefCount(), data.inputData.incRefCount()) + } + + // The first stage is to expand the aggregate based on the count column + val repeatedCb = closeOnExcept(rideAlong) { _ => + withRetryNoSplit(aggResult) { scb => + opTime.ns { + withResource(scb.getColumnarBatch()) { cb => + withResource(boundStages.boundCount.columnarEval(cb)) { counts => + withResource(GpuProjectExec.project(cb, boundStages.boundAggsToRepeat)) { toRepeat => + withResource(GpuColumnVector.from(toRepeat)) { table => + GpuColumnVector.from(table.repeat(counts.getBase), + boundStages.boundAggsToRepeat.map(_.dataType).toArray) + } + } + } + } + } + } + } + val combined = withResource(rideAlong) { _ => + // Second step is to stitch the two together + withResource(repeatedCb) { _ => + withResource(rideAlong.getColumnarBatch()) { rideAlong => + opTime.ns { + GpuColumnVector.appendColumns(rideAlong, GpuColumnVector.extractColumns(repeatedCb): _*) + } + } + } + } + withResource(combined) { _ => + opTime.ns { + closeOnExcept(GpuProjectExec.project(combined, boundStages.boundFinalProject)) { ret => + numOutputBatches += 1 + numOutputRows += ret.numRows() + ret + } + } + } } } +/** + * Holds the bound references for various aggregation stages + * @param boundRideAlong used for a project that pulls out columns that are passing through + * unchanged. + * @param boundAggregations aggregations to be done. NOTE THIS IS WIP + * @param boundCount The column that contains the count in it for the number of aggregations + * @param boundAggsToRepeat the columns to get that need to be repeated by the amount in count + * @param boundFinalProject the final project to get the output in the right order + */ +case class GpuUnboundedToUnboundedAggStages( + boundRideAlong: Seq[GpuExpression], + boundAggregations: Seq[GpuExpression], + boundCount: GpuExpression, + boundAggsToRepeat: Seq[GpuExpression], + boundFinalProject: Seq[GpuExpression]) extends Serializable + /** * An iterator that can do unbounded to unbounded window aggregations as group by aggregations * followed by an expand/join. */ object GpuUnboundedToUnboundedAggWindowIterator { + + private def rideAlongProjection(windowOps: Seq[NamedExpression], + childOutput: Seq[Attribute]): (Seq[Attribute], Seq[GpuExpression]) = { + val rideAlong = windowOps.filter { + case GpuAlias(_: AttributeReference, _) | _: AttributeReference => true + case _ => false + } + val rideAlongOutput = rideAlong.map(_.toAttribute) + val boundRideAlong = GpuBindReferences.bindGpuReferences(rideAlong, childOutput) + (rideAlongOutput, boundRideAlong) + } + + + private def tmpAggregationOps(windowOps: Seq[NamedExpression], + childOutput: Seq[Attribute]): (Seq[Attribute], Seq[GpuExpression]) = { + // TODO I don't know what this is really going to look like. I am just doing an approximation + // here so I can get the output of the aggregations after everything is done for the + // repeat. Please fill this in/split it apart, whatever to make it work for you + val windowAggs = windowOps.flatMap { + case GpuAlias(_: AttributeReference, _) | _: AttributeReference => None + case ga@GpuAlias(GpuWindowExpression(agg: GpuUnboundedToUnboundedWindowAgg, _), _) => + // We don't care about the spec, they are all unbounded to unbounded so just get the func + // We do care that we keep the expression id so we can line it up at the very end + Some(GpuAlias(agg, ga.name)(ga.exprId)) + case ga@GpuAlias(GpuWindowExpression(GpuAggregateExpression( + agg: GpuUnboundedToUnboundedWindowAgg, _, _, _, _), _), _) => + // TODO should I verify distinct, filter, etc + // We don't care about the spec, they are all unbounded to unbounded so just get the func + // We do care that we keep the expression id so we can line it up at the very end + Some(GpuAlias(agg, ga.name)(ga.exprId)) + case other => + // This should only happen if we did something wrong with how this was created. + throw new IllegalArgumentException( + s"Found unexpected expression $other in window exec ${other.getClass}") + } :+ GpuAlias(GpuCount(Seq(GpuLiteral(1L))), "_count")() + // Later code by conventions "knows" that the last column is a count and that it can be + // thrown away. If we ever dedupe this with a COUNT(1) that already exists, then + // we need to update the output of this to have a clean way to say what is the count, + // and if that count is needed see repeatOps + + val aggregationsOutput = windowAggs.map(_.toAttribute) + val boundAggregations = GpuBindReferences.bindGpuReferences(windowAggs, childOutput) + (aggregationsOutput, boundAggregations) + } + + private def repeatOps( + aggregationsOutput: Seq[Attribute]): (GpuExpression, Seq[Attribute], Seq[GpuExpression]) = { + // It is assumed that the last aggregation column is a count that we will use for repeat + // If that ever changes, this code needs to be updated. + val aggOutputExpressions = aggregationsOutput.map { attr => + GpuAlias( + AttributeReference(attr.name, attr.dataType, attr.nullable)(attr.exprId), + attr.name)(attr.exprId) + } + val boundAggOutputExpressions = + GpuBindReferences.bindGpuReferences(aggOutputExpressions, aggregationsOutput) + + val boundCount = boundAggOutputExpressions.last + val aggsToRepeat = boundAggOutputExpressions.slice(0, boundAggOutputExpressions.length - 1) + val aggsToRepeatOutput = aggregationsOutput.slice(0, aggregationsOutput.length - 1) + (boundCount, aggsToRepeatOutput, aggsToRepeat) + } + + def computeFinalProject(rideAlongOutput: Seq[Attribute], + aggsToRepeatOutput: Seq[Attribute], + windowOps: Seq[NamedExpression]): Seq[GpuExpression] = { + val combinedOutput = rideAlongOutput ++ aggsToRepeatOutput + val remapped = windowOps.map { expr => + GpuAlias(AttributeReference(expr.name, expr.dataType, expr.nullable)(expr.exprId), + expr.name)(expr.exprId) + } + GpuBindReferences.bindGpuReferences(remapped, combinedOutput) + } + + /** + * Break up the window operations into the various needed stages and bind them. + * @param gpuPartitionSpec the partition spec for the GPU + * @param windowOps the window operations (along with the pass-through columns) + * @param childOutput what the output of the operation feeding this looks like + * @return + */ + def breakUpAggregations(gpuPartitionSpec: Seq[Expression], + windowOps: Seq[NamedExpression], + childOutput: Seq[Attribute]): GpuUnboundedToUnboundedAggStages = { + // STEP 1. project that will pull out the columns that are output unchanged. + val (rideAlongOutput, boundRideAlong) = rideAlongProjection(windowOps, childOutput) + + // STEP 2. project that will pull out the columns needed for the aggregation. + val (aggregationsOutput, boundAggregations) = tmpAggregationOps(windowOps, childOutput) + + // STEP N: Given the output of the aggregations get count column and the other + // columns so we can do the repeat call. + val (boundCount, aggsToRepeatOutput, aggsToRepeat) = repeatOps(aggregationsOutput) + + // STEP N + 1: After the repeat is done the repeated columns are put at the end of the + // rideAlong columns and then we need to do a project that would put them all in the + // proper output order, according to the windowOps + val finalProject = computeFinalProject(rideAlongOutput, aggsToRepeatOutput, windowOps) + + GpuUnboundedToUnboundedAggStages(boundRideAlong, boundAggregations, + boundCount, aggsToRepeat, finalProject) + } + def apply(input: Iterator[ColumnarBatch], - boundWindowOps: Seq[GpuExpression], - boundPartitionSpec: Seq[GpuExpression], - boundOrderSpec: Seq[SortOrder], - outputTypes: Array[DataType], + boundStages: GpuUnboundedToUnboundedAggStages, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric): Iterator[ColumnarBatch] = { - val firstPass = new GpuUnboundedToUnboundedAggWindowFirstPassIterator(input, boundWindowOps, - boundPartitionSpec, boundOrderSpec, outputTypes, opTime) + val firstPass = new GpuUnboundedToUnboundedAggWindowFirstPassIterator(input, boundStages, + opTime) val secondPass = new GpuUnboundedToUnboundedAggWindowSecondPassIterator(firstPass, - boundWindowOps, boundPartitionSpec, boundOrderSpec, outputTypes, opTime) - new GpuUnboundedToUnboundedAggFinalIterator(secondPass, boundWindowOps, boundPartitionSpec, - boundOrderSpec, outputTypes, numOutputBatches, numOutputRows, opTime) + boundStages, opTime) + new GpuUnboundedToUnboundedAggFinalIterator(secondPass, boundStages, + numOutputBatches, numOutputRows, opTime) } } @@ -200,18 +337,23 @@ case class GpuUnboundedToUnboundedAggWindowExec( override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil + // For this we only need the data to be sorted by the partition columns, but + // we don't change the input sort from the CPU yet. In some cases we might even + // be able to remove the sort entirely. https://github.com/NVIDIA/spark-rapids/issues/9989 + override def requiredChildOrdering: Seq[Seq[SortOrder]] = + Seq(cpuPartitionOrdering) + override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputBatches = gpuLongMetric(GpuMetric.NUM_OUTPUT_BATCHES) val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS) val opTime = gpuLongMetric(GpuMetric.OP_TIME) - val boundWindowOps = GpuBindReferences.bindGpuReferences(windowOps, child.output) - val boundPartitionSpec = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, child.output) - val boundOrderSpec = GpuBindReferences.bindReferences(gpuOrderSpec, child.output) + val boundStages = GpuUnboundedToUnboundedAggWindowIterator.breakUpAggregations( + gpuPartitionSpec, windowOps, child.output) child.executeColumnar().mapPartitions { iter => - GpuUnboundedToUnboundedAggWindowIterator(iter, boundWindowOps, boundPartitionSpec, - boundOrderSpec, output.map(_.dataType).toArray, numOutputBatches, numOutputRows, opTime) + GpuUnboundedToUnboundedAggWindowIterator(iter, boundStages, + numOutputBatches, numOutputRows, opTime) } } } \ No newline at end of file From 6325cf20b183cb9ff0109f9a5823a9830d57bdd5 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 11 Jan 2024 15:00:10 -0600 Subject: [PATCH 06/14] Fixed a few issues Signed-off-by: Robert (Bobby) Evans --- ...GpuUnboundedToUnboundedAggWindowExec.scala | 161 +++++++++++------- ...puUnboundedToUnboundedAggWindowSuite.scala | 81 +++++++++ 2 files changed, 184 insertions(+), 58 deletions(-) create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index 58f51e05ae1..ba0e9c331bb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -18,9 +18,11 @@ package com.nvidia.spark.rapids.window import com.nvidia.spark.rapids.{GpuAlias, GpuBindReferences, GpuColumnVector, GpuExpression, GpuLiteral, GpuMetric, GpuProjectExec, SpillableColumnarBatch, SpillPriorities} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry, withRetryNoSplit} +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry} +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import java.util +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder} import org.apache.spark.sql.execution.SparkPlan @@ -32,12 +34,15 @@ import org.apache.spark.sql.vectorized.ColumnarBatch // The data as needed. Instead we are going to decompose the problem into multiple iterators that // feed into each other. // The first pass iterator will take in a batch of data and produce one or more aggregated result -// pairs that include the original input data with them. - -case class AggResult(inputData: SpillableColumnarBatch, +// pairs that include the ridealong columns with the aggregation results for that batch. +// Note that it is assumed that the aggregation was done as a sort based aggregation, so +// the ridealong columns and the aggregation result should both be sorted by the partition by +// columns. Also the aggregation result must have a count column so it can be expanded using +// repeat to get back to the size of the ridealong columns. +case class FirstPassAggResult(rideAlongColumns: SpillableColumnarBatch, aggResult: SpillableColumnarBatch) extends AutoCloseable { override def close(): Unit = { - inputData.close() + rideAlongColumns.close() aggResult.close() } } @@ -45,11 +50,11 @@ case class AggResult(inputData: SpillableColumnarBatch, class GpuUnboundedToUnboundedAggWindowFirstPassIterator( input: Iterator[ColumnarBatch], boundStages: GpuUnboundedToUnboundedAggStages, - opTime: GpuMetric) extends Iterator[AggResult] { - private var subIterator: Option[Iterator[AggResult]] = None + opTime: GpuMetric) extends Iterator[FirstPassAggResult] { + private var subIterator: Option[Iterator[FirstPassAggResult]] = None override def hasNext: Boolean = subIterator.exists(_.hasNext) || input.hasNext - override def next(): AggResult = { + override def next(): FirstPassAggResult = { if (!hasNext) { throw new NoSuchElementException() } @@ -71,103 +76,144 @@ class GpuUnboundedToUnboundedAggWindowFirstPassIterator( } } -// The second pass through the data will take the input data, slice it based off of what is -// known to be complete and what is not known yet. Then combine the aggregations as needed -// This is similar to a merge stage. We are not going to try and combine small slices together -// yet. +// The second pass through the data will take the output of the first pass. It will slice +// the result depending on if it knows that the group by keys is complete or not. +// Completed data will have the aggregation results merged into a single aggregation result +// Note that this aggregation result needs to remain sorted. The result is returned as +// an iterator of ridealong columns, and the full agg results for those columns. It is not +// the responsibility of the second stage to try and combine small batches or split up large +// ones, beyond what the retry framework might do. +case class SecondPassAggResult(rideAlongColumns: util.LinkedList[SpillableColumnarBatch], + aggResult: SpillableColumnarBatch) { +} + class GpuUnboundedToUnboundedAggWindowSecondPassIterator( - input: Iterator[AggResult], + input: Iterator[FirstPassAggResult], boundStages: GpuUnboundedToUnboundedAggStages, - opTime: GpuMetric) extends Iterator[AggResult] { + opTime: GpuMetric) extends Iterator[SecondPassAggResult] { // input data where we don't know if the results are done yet - private val inputDataPendingCompletion = new util.LinkedList[SpillableColumnarBatch]() + // TODO this should probably be a var once we start using it + private val rideAlongColumnsPendingCompletion = new util.LinkedList[SpillableColumnarBatch]() // Agg results where the input keys are not fully complete yet. They will need to be combined // together before being returned. - // TODO private var aggResultsPendingCompletion = new util.LinkedList[SpillableColumnarBatch]() - - private val inputDataThatIsComplete = new util.LinkedList[SpillableColumnarBatch]() - private var aggResultsThatAreComplete: Option[SpillableColumnarBatch] = None + // TODO this should be uncommented once we start using it + // private val aggResultsPendingCompletion = new util.LinkedList[SpillableColumnarBatch]() - override def hasNext: Boolean = (!inputDataThatIsComplete.isEmpty) || - (!inputDataPendingCompletion.isEmpty) || input.hasNext + override def hasNext: Boolean = (!rideAlongColumnsPendingCompletion.isEmpty) || input.hasNext - override def next(): AggResult = { + override def next(): SecondPassAggResult = { if (!hasNext) { throw new NoSuchElementException() } - while (inputDataThatIsComplete.isEmpty) { + var output: Option[SecondPassAggResult] = None + while (output.isEmpty) { if (input.hasNext) { withResource(input.next()) { newData => + // TODO remove this line. It is here to avoid compile warnings becoming errors + output = None throw new IllegalStateException("Actually split the inputs") // TODO newData should be sliced based off of which rows are known to be completed and - // which are not. Then they should be placed in the appropriate state queues. Please note - // that this cannot be done with a split and retry, but should be done with regular retry + // which are not. If there are parts that are done it should be combined with + // the data pending completion and put into output. Then the incomplete data + // should be put into the pending completion queues. } } else { throw new IllegalStateException("Merge aggResultsPendingCompletion") // TODO There is no more data, so we need to merge the aggResultsPendingCompletion - // into a single SpillableColumnarBatch, and put the result in aggResultsThatAreComplete - // then move all of the batches in inputDataPendingCompletion to inputDataThatIsComplete - // Please note that this cannot be done with a split and retry, but should be done with - // regular retry. + // into a single SpillableColumnarBatch, and put the result output along with + // the rideAlongColumnPendingCompletion } } - val nextData = inputDataThatIsComplete.pop - val aggResult = aggResultsThatAreComplete.get - if (inputDataThatIsComplete.isEmpty) { - // Nothing left to work on - aggResultsThatAreComplete = None - AggResult(nextData, aggResult) - } else { - // We are reusing this spillable columnar batch so inc the ref count to avoid it being - // close too early - AggResult(nextData, aggResult.incRefCount()) - } + output.get } } // The final step is to take the original input data along with the agg data, estimate how // to split/combine the input batches to output batches that are close to the target batch size +// Then expand the data to match that size, combine everything together and return the result. class GpuUnboundedToUnboundedAggFinalIterator( - input: Iterator[AggResult], + input: Iterator[SecondPassAggResult], boundStages: GpuUnboundedToUnboundedAggStages, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric) extends Iterator[ColumnarBatch] { - override def hasNext: Boolean = input.hasNext + private var pending: Option[SecondPassAggResult] = None + + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + closePending() + } + } + + private def hasMoreInPending: Boolean = pending.exists(!_.rideAlongColumns.isEmpty) + private def pendingAggResults: SpillableColumnarBatch = pending.get.aggResult.incRefCount() + private def nextPendingRideAlong: SpillableColumnarBatch = pending.get.rideAlongColumns.pop + private def closePending(): Unit = { + pending.foreach(_.aggResult.close()) + pending.foreach(_.rideAlongColumns.forEach(_.close())) + pending = None + } + private def replacePending(p: SecondPassAggResult): Unit = { + closePending() + pending = Some(p) + } + + override def hasNext: Boolean = hasMoreInPending || input.hasNext override def next(): ColumnarBatch = { // TODO we need to add in the retry code, and pre-splitting of the data if possible, but // for now we are just going to try it. - val (aggResult, rideAlong) = withResource(input.next()) { data => - (data.aggResult.incRefCount(), data.inputData.incRefCount()) + if (!hasNext) { + throw new NoSuchElementException() + } + while (!hasMoreInPending) { + replacePending(input.next()) + } + + // TODO this is a very dumb version right now that is not checking for size + // That will be added later on. + + // TODO fix this. We don't want just one batch of ride along columns, and we don't + // want to leak anything if we run out of memory + var rideAlongCombined: ColumnarBatch = null + while (hasMoreInPending) { + val cb = withResource(nextPendingRideAlong) { scb => + scb.getColumnarBatch() + } + withResource(cb) { _ => + if (rideAlongCombined == null) { + rideAlongCombined = GpuColumnVector.incRefCounts(cb) + } else { + rideAlongCombined.close() + throw new IllegalStateException("Concat not implemented yet...") + } + } } // The first stage is to expand the aggregate based on the count column - val repeatedCb = closeOnExcept(rideAlong) { _ => - withRetryNoSplit(aggResult) { scb => + val combined = withResource(rideAlongCombined) { _ => + val repeatedCb = withResource(pendingAggResults) { scb => opTime.ns { withResource(scb.getColumnarBatch()) { cb => withResource(boundStages.boundCount.columnarEval(cb)) { counts => withResource(GpuProjectExec.project(cb, boundStages.boundAggsToRepeat)) { toRepeat => withResource(GpuColumnVector.from(toRepeat)) { table => - GpuColumnVector.from(table.repeat(counts.getBase), - boundStages.boundAggsToRepeat.map(_.dataType).toArray) + withResource(table.repeat(counts.getBase)) { repeated => + GpuColumnVector.from(repeated, + boundStages.boundAggsToRepeat.map(_.dataType).toArray) + } } } } } } } - } - val combined = withResource(rideAlong) { _ => // Second step is to stitch the two together withResource(repeatedCb) { _ => - withResource(rideAlong.getColumnarBatch()) { rideAlong => - opTime.ns { - GpuColumnVector.appendColumns(rideAlong, GpuColumnVector.extractColumns(repeatedCb): _*) - } + opTime.ns { + GpuColumnVector.appendColumns(rideAlongCombined, + GpuColumnVector.extractColumns(repeatedCb): _*) } } } @@ -204,8 +250,7 @@ case class GpuUnboundedToUnboundedAggStages( * followed by an expand/join. */ object GpuUnboundedToUnboundedAggWindowIterator { - - private def rideAlongProjection(windowOps: Seq[NamedExpression], + def rideAlongProjection(windowOps: Seq[NamedExpression], childOutput: Seq[Attribute]): (Seq[Attribute], Seq[GpuExpression]) = { val rideAlong = windowOps.filter { case GpuAlias(_: AttributeReference, _) | _: AttributeReference => true @@ -217,7 +262,7 @@ object GpuUnboundedToUnboundedAggWindowIterator { } - private def tmpAggregationOps(windowOps: Seq[NamedExpression], + def tmpAggregationOps(windowOps: Seq[NamedExpression], childOutput: Seq[Attribute]): (Seq[Attribute], Seq[GpuExpression]) = { // TODO I don't know what this is really going to look like. I am just doing an approximation // here so I can get the output of the aggregations after everything is done for the @@ -249,7 +294,7 @@ object GpuUnboundedToUnboundedAggWindowIterator { (aggregationsOutput, boundAggregations) } - private def repeatOps( + def repeatOps( aggregationsOutput: Seq[Attribute]): (GpuExpression, Seq[Attribute], Seq[GpuExpression]) = { // It is assumed that the last aggregation column is a count that we will use for repeat // If that ever changes, this code needs to be updated. diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala new file mode 100644 index 00000000000..9c05fe00e94 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.window + +import ai.rapids.cudf.{ColumnVector, Table} +import com.nvidia.spark.rapids.{GpuColumnVector, NoopMetric, RmmSparkRetrySuiteBase, SpillableColumnarBatch, SpillPriorities} +import com.nvidia.spark.rapids.Arm.withResource +import java.util + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.types.{DataType, IntegerType, LongType, ShortType} + +class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { + test("basic repeat test") { + // First I need to setup the operations. I am trying to test repeat in isolation + // so we are not going to build them up using the front door + val aggOutput = Seq(AttributeReference("my_max", IntegerType, true)(), + AttributeReference("_count", LongType, true)()) + + val rideAlongOutput = Seq(AttributeReference("a", ShortType, true)()) + val (boundCount, repeatOutput, boundRepeat) = + GpuUnboundedToUnboundedAggWindowIterator.repeatOps(aggOutput) + + val finalProject = GpuUnboundedToUnboundedAggWindowIterator.computeFinalProject( + rideAlongOutput, repeatOutput, repeatOutput ++ rideAlongOutput) + + val conf = GpuUnboundedToUnboundedAggStages(Seq.empty, Seq.empty, + boundCount, boundRepeat, finalProject) + + def makeRepeatCb(): SpillableColumnarBatch = { + // very basic test to verify that the repeat stage works properly. + val table = withResource(ColumnVector.fromInts(1, 2)) { data1 => + withResource(ColumnVector.fromLongs(2, 3)) { counts => + new Table(data1, counts) + } + } + withResource(table) { _ => + SpillableColumnarBatch( + GpuColumnVector.from(table, Array[DataType](IntegerType, LongType)), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + } + + def makeRideAlongCb(): SpillableColumnarBatch = { + // very basic test to verify that the repeat stage works properly. + val table = withResource(ColumnVector.fromShorts(1, 2, 3, 4, 5)) { data1 => + new Table(data1) + } + withResource(table) { _ => + SpillableColumnarBatch( + GpuColumnVector.from(table, Array[DataType](ShortType)), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + + } + } + val rideAlongList = new util.LinkedList[SpillableColumnarBatch] + rideAlongList.add(makeRideAlongCb()) + val inputIter = Seq(SecondPassAggResult(rideAlongList, makeRepeatCb())).toIterator + val repeatIter = new GpuUnboundedToUnboundedAggFinalIterator(inputIter, conf, + NoopMetric, NoopMetric, NoopMetric) + + assert(repeatIter.hasNext) + withResource(repeatIter.next()) { result => + assert(result.numCols() == 2) + } + } +} From d1f2e591978c469cc753ff2c85b84ad4634eab98 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 12 Jan 2024 10:35:45 -0600 Subject: [PATCH 07/14] Added in some more options to try and slice the agg data --- ...GpuUnboundedToUnboundedAggWindowExec.scala | 379 ++++++++++++++++-- ...puUnboundedToUnboundedAggWindowSuite.scala | 6 +- 2 files changed, 338 insertions(+), 47 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index ba0e9c331bb..ebe1d62ef8c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -16,9 +16,14 @@ package com.nvidia.spark.rapids.window -import com.nvidia.spark.rapids.{GpuAlias, GpuBindReferences, GpuColumnVector, GpuExpression, GpuLiteral, GpuMetric, GpuProjectExec, SpillableColumnarBatch, SpillPriorities} +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import ai.rapids.cudf +import com.nvidia.spark.rapids.{ConcatAndConsumeAll, GpuAlias, GpuBindReferences, GpuColumnVector, GpuExpression, GpuLiteral, GpuMetric, GpuProjectExec, SpillableColumnarBatch, SpillPriorities} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry} +import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq +import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import java.util @@ -27,18 +32,42 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.rapids.aggregate.{GpuAggregateExpression, GpuCount} -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.types.{DataType, LongType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + + +/** + * Just a simple wrapper to make working with buffers of AutoClosable things play + * nicely with withResource. + */ +class AutoClosableArrayBuffer[T <: AutoCloseable]() extends AutoCloseable { + private val data = new ArrayBuffer[T]() + + def append(scb: T): Unit = data.append(scb) + + def last: T = data.last + + def removeLast(): T = data.remove(data.length - 1) + def foreach[U](f: T => U): Unit = data.foreach(f) + + def toArray[B >: T : ClassTag]: Array[B] = data.toArray + + override def close(): Unit = { + data.foreach(_.close()) + data.clear() + } +} // It is not really simple to do a single iterator that can do the splits and retries along with // The data as needed. Instead we are going to decompose the problem into multiple iterators that // feed into each other. // The first pass iterator will take in a batch of data and produce one or more aggregated result -// pairs that include the ridealong columns with the aggregation results for that batch. +// pairs that include the ride-along columns with the aggregation results for that batch. // Note that it is assumed that the aggregation was done as a sort based aggregation, so -// the ridealong columns and the aggregation result should both be sorted by the partition by +// the ride-along columns and the aggregation result should both be sorted by the partition by // columns. Also the aggregation result must have a count column so it can be expanded using -// repeat to get back to the size of the ridealong columns. +// repeat to get back to the size of the ride-along columns. case class FirstPassAggResult(rideAlongColumns: SpillableColumnarBatch, aggResult: SpillableColumnarBatch) extends AutoCloseable { override def close(): Unit = { @@ -80,11 +109,16 @@ class GpuUnboundedToUnboundedAggWindowFirstPassIterator( // the result depending on if it knows that the group by keys is complete or not. // Completed data will have the aggregation results merged into a single aggregation result // Note that this aggregation result needs to remain sorted. The result is returned as -// an iterator of ridealong columns, and the full agg results for those columns. It is not +// an iterator of ride-along columns, and the full agg results for those columns. It is not // the responsibility of the second stage to try and combine small batches or split up large // ones, beyond what the retry framework might do. case class SecondPassAggResult(rideAlongColumns: util.LinkedList[SpillableColumnarBatch], - aggResult: SpillableColumnarBatch) { + aggResult: SpillableColumnarBatch) extends AutoCloseable { + override def close(): Unit = { + rideAlongColumns.forEach(_.close()) + rideAlongColumns.clear() + aggResult.close() + } } class GpuUnboundedToUnboundedAggWindowSecondPassIterator( @@ -128,9 +162,259 @@ class GpuUnboundedToUnboundedAggWindowSecondPassIterator( } } -// The final step is to take the original input data along with the agg data, estimate how -// to split/combine the input batches to output batches that are close to the target batch size -// Then expand the data to match that size, combine everything together and return the result. +// The next to final step is to take the original input data along with the agg data, estimate how +// to split/combine the input batches to output batches that are close to the target batch size. + +case class SlicedBySize(rideAlongColumns: SpillableColumnarBatch, + aggResults: SpillableColumnarBatch) extends AutoCloseable { + override def close(): Unit = { + rideAlongColumns.close() + aggResults.close() + } +} + +object PendingSecondAggResults { + def apply(result: SecondPassAggResult, + boundStages: GpuUnboundedToUnboundedAggStages, + targetSizeBytes: Long): PendingSecondAggResults = { + closeOnExcept(result) { _ => + new PendingSecondAggResults(result.rideAlongColumns, result.aggResult, + boundStages, targetSizeBytes) + } + } + + def makeBatch(columns: Array[cudf.ColumnVector], types: Array[DataType]): ColumnarBatch = { + val tmp = columns.zip(types).map { + case (c, t) => GpuColumnVector.from(c, t).asInstanceOf[ColumnVector] + } + new ColumnarBatch(tmp, columns(0).getRowCount.toInt) + } + + def splitCb(cb: ColumnarBatch, inclusiveCutPoint: Int): (ColumnarBatch, ColumnarBatch) = { + // First save the types + val types = GpuColumnVector.extractTypes(cb) + // Slice is at the column level, not at a table level + closeOnExcept(new ArrayBuffer[cudf.ColumnVector]()) { before => + val afterCb = closeOnExcept(new ArrayBuffer[cudf.ColumnVector]()) { after => + withResource(GpuColumnVector.extractBases(cb)) { bases => + bases.foreach { base => + val result = base.split(inclusiveCutPoint) + before.append(result(0)) + after.append(result(1)) + assert(result.length == 2) + } + } + makeBatch(after.toArray, types) + } + closeOnExcept(afterCb) { _ => + (makeBatch(before.toArray, types), afterCb) + } + } + } + + def concatBatchesAndClose(toConcat: AutoClosableArrayBuffer[SpillableColumnarBatch], + sparkTypes: Array[DataType]): SpillableColumnarBatch = { + val cb = withRetryNoSplit(toConcat) { _ => + closeOnExcept(new AutoClosableArrayBuffer[ColumnarBatch]) { cbs => + toConcat.foreach { scb => + cbs.append(scb.getColumnarBatch()) + } + // This consumes/closes the array of batches + ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(cbs.toArray, sparkTypes) + } + } + SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + } + + def splitAggResultByRepeatedRows(aggResult: SpillableColumnarBatch, + targetRows: Int, + totalRows: Long): (SpillableColumnarBatch, SpillableColumnarBatch) = { + // We have high confidence that we need to split this in two, but even then we don't + // have enough information here to know that we don't need to split it without + // processing the batch + withResource(aggResult.getColumnarBatch()) { cb => + if (cb.numRows() == 1) { + // This is a very common special case where there is one and only one row, so + // we need to keep all of the columns the same, but slice the count row accordingly. + withResource(AggResultBatchConventions.getRepeatedAggColumns(cb)) { aggs => + // The aggs are just repeated, but the count is new + val firstPart = withResource(cudf.ColumnVector.fromLongs(targetRows)) { count => + AggResultBatchConventions.appendCountColumn(aggs, count) + } + val secondPart = closeOnExcept(firstPart) { _ => + withResource(cudf.ColumnVector.fromLongs(totalRows - targetRows)) { + count => + AggResultBatchConventions.appendCountColumn(aggs, count) + } + } + (SpillableColumnarBatch(firstPart, SpillPriorities.ACTIVE_ON_DECK_PRIORITY), + SpillableColumnarBatch(secondPart, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + } + } else { + // This is a little complicated in the general case. We need to find which row + // in the aggregation we need to split on. The only way to do that is to get a + // running sum of the counts, and then do an upper bound on that column + withResource(AggResultBatchConventions.getCount(cb)) { counts => + throw new IllegalStateException("NOT IMPLEMENTED YET...") + } + } + } + } +} + +class PendingSecondAggResults private( + private val rideAlongColumns: util.LinkedList[SpillableColumnarBatch], + private var aggResult: SpillableColumnarBatch, + private val boundStages: GpuUnboundedToUnboundedAggStages, + private val targetSizeBytes: Long) extends Iterator[SlicedBySize] with AutoCloseable { + import PendingSecondAggResults._ + + private var totalRowsInAgg = { + var total = 0L + rideAlongColumns.forEach(total += _.numRows()) + total + } + + override def hasNext: Boolean = !rideAlongColumns.isEmpty + + /** + * We want to estimate the average size per row that the aggregations will add. This + * does not have to be perfect because we will back it up with a split and retry handling + * that can slice the output in half. We are also going to include the count column because + * I don't want to read the data back, if it spilled. + */ + private def estimateAggSizePerRow: Double = + aggResult.sizeInBytes.toDouble / aggResult.numRows() + + /** + * Gets the next batch of ride along columns to process. + */ + private def getRideAlongToProcess(): SpillableColumnarBatch = { + val averageAggSizePerRow = estimateAggSizePerRow + var currentSize = 0L + var numRowsTotal = 0 + + // First pull in the batches that might be enough to process + val toProcess = new AutoClosableArrayBuffer[SpillableColumnarBatch]() + closeOnExcept(toProcess) { _ => + while (currentSize < targetSizeBytes && !rideAlongColumns.isEmpty) { + val scb = rideAlongColumns.pop() + toProcess.append(scb) + val numRows = scb.numRows() + val estimatedSize = (scb.sizeInBytes + (numRows * averageAggSizePerRow)).toLong + numRowsTotal += numRows + currentSize += estimatedSize + } + + if (currentSize > targetSizeBytes) { + // If we buffered too much data we need to decide how to slice it, but we only + // want to slice the last batch in toProcess because we know that the batch before + // it was not large enough to send us over the limit. We do this by estimating how + // many rows we need from toProcess and hence how many rows we need to remove. + val avgSizePerRow = currentSize.toDouble / numRowsTotal + val estimatedRowsToKeep = math.ceil(targetSizeBytes / avgSizePerRow).toLong + val estimatedRowsToRemove = numRowsTotal - estimatedRowsToKeep + + // If we need to remove more rows, than the last batch has, we just remove the last batch + val numRowsToRemove = if (estimatedRowsToRemove >= toProcess.last.numRows) { + val theLastOne = toProcess.removeLast() + rideAlongColumns.addFirst(theLastOne) + // We probably don't need to update numRowsTotal, but it is just to be defensive + numRowsTotal -= theLastOne.numRows() + 0 + } else { + numRowsTotal - estimatedRowsToKeep + } + + if (numRowsToRemove > 0) { + // We need to slice the last batch + val theLastOne = toProcess.removeLast() + val numRowsToKeepInLastBatch = (theLastOne.numRows() - numRowsToRemove).toInt + val (keep, forNextTime) = withRetryNoSplit(theLastOne) { _ => + withResource(theLastOne.getColumnarBatch()) { cb => + splitCb(cb, numRowsToKeepInLastBatch) + } + } + rideAlongColumns.addFirst(SpillableColumnarBatch(forNextTime, + SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + + toProcess.append(SpillableColumnarBatch(keep, + SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + } + } + } + concatBatchesAndClose(toProcess, boundStages.boundRideAlong.map(_.dataType).toArray) + } + + def getSlicedAggResultByRepeatedRows(numDesiredRows: Int): SpillableColumnarBatch = { + val (ret, keep) = withRetryNoSplit(aggResult) { _ => + splitAggResultByRepeatedRows(aggResult, numDesiredRows, totalRowsInAgg) + } + totalRowsInAgg -= numDesiredRows + aggResult = keep + ret + } + + override def next(): SlicedBySize = { + if (!hasNext) { + throw new NoSuchElementException() + } + closeOnExcept(getRideAlongToProcess()) { rideAlongScb => + if (rideAlongColumns.isEmpty) { + // This is the last batch so we don't need to even figure out where to slice + // the AggResult + SlicedBySize(rideAlongScb, aggResult.incRefCount()) + } else { + SlicedBySize(rideAlongScb, getSlicedAggResultByRepeatedRows(rideAlongScb.numRows())) + } + } + } + + override def close(): Unit = { + rideAlongColumns.forEach(_.close()) + rideAlongColumns.clear() + aggResult.close() + } +} + +/** + * Try to slice the input batches into right sized output. + */ +class GpuUnboundedToUnboundedAggSliceBySizeIterator( + input: Iterator[SecondPassAggResult], + boundStages: GpuUnboundedToUnboundedAggStages, + targetSizeBytes: Long) extends Iterator[SlicedBySize] { + + private var pending: Option[PendingSecondAggResults] = None + private def pendingHasNext: Boolean = pending.exists(_.hasNext) + + override def hasNext: Boolean = pendingHasNext || input.hasNext + + override def next(): SlicedBySize = { + if (!hasNext) { + throw new NoSuchElementException() + } + + if (!pendingHasNext) { + pending = Some(PendingSecondAggResults(input.next(), boundStages, targetSizeBytes)) + } + pending.get.next + } + + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + close() + } + } + + def close(): Unit = { + pending.foreach(_.close()) + pending = None + } +} + +// The final step is to expand the data to match that size, combine everything together and +// return the result. class GpuUnboundedToUnboundedAggFinalIterator( input: Iterator[SecondPassAggResult], @@ -196,12 +480,12 @@ class GpuUnboundedToUnboundedAggFinalIterator( val repeatedCb = withResource(pendingAggResults) { scb => opTime.ns { withResource(scb.getColumnarBatch()) { cb => - withResource(boundStages.boundCount.columnarEval(cb)) { counts => - withResource(GpuProjectExec.project(cb, boundStages.boundAggsToRepeat)) { toRepeat => + withResource(AggResultBatchConventions.getCount(cb)) { counts => + withResource(AggResultBatchConventions.getRepeatedAggColumns(cb)) { toRepeat => + val dataTypes = GpuColumnVector.extractTypes(toRepeat) withResource(GpuColumnVector.from(toRepeat)) { table => withResource(table.repeat(counts.getBase)) { repeated => - GpuColumnVector.from(repeated, - boundStages.boundAggsToRepeat.map(_.dataType).toArray) + GpuColumnVector.from(repeated, dataTypes) } } } @@ -234,17 +518,40 @@ class GpuUnboundedToUnboundedAggFinalIterator( * @param boundRideAlong used for a project that pulls out columns that are passing through * unchanged. * @param boundAggregations aggregations to be done. NOTE THIS IS WIP - * @param boundCount The column that contains the count in it for the number of aggregations - * @param boundAggsToRepeat the columns to get that need to be repeated by the amount in count * @param boundFinalProject the final project to get the output in the right order */ case class GpuUnboundedToUnboundedAggStages( boundRideAlong: Seq[GpuExpression], boundAggregations: Seq[GpuExpression], - boundCount: GpuExpression, - boundAggsToRepeat: Seq[GpuExpression], boundFinalProject: Seq[GpuExpression]) extends Serializable +object AggResultBatchConventions { + private def getColumnFromBatch(cb: ColumnarBatch, colId: Int): ColumnVector = { + val ret = cb.column(colId) + ret.asInstanceOf[GpuColumnVector].incRefCount() + ret + } + + def getCount(cb: ColumnarBatch): GpuColumnVector = { + // By convention the last column is the count column + getColumnFromBatch(cb, cb.numCols() - 1).asInstanceOf[GpuColumnVector] + } + + def getRepeatedAggColumns(cb: ColumnarBatch): ColumnarBatch = { + // By convention all of the columns, except the last one are agg columns + val columns = (0 until cb.numCols() - 1).safeMap { index => + getColumnFromBatch(cb, index) + } + new ColumnarBatch(columns.toArray, cb.numRows()) + } + + def appendCountColumn(repeatedAggColumns: ColumnarBatch, + counts: cudf.ColumnVector): ColumnarBatch = { + val countCol = GpuColumnVector.fromChecked(counts.incRefCount(), LongType) + GpuColumnVector.appendColumns(repeatedAggColumns, countCol) + } +} + /** * An iterator that can do unbounded to unbounded window aggregations as group by aggregations * followed by an expand/join. @@ -285,31 +592,18 @@ object GpuUnboundedToUnboundedAggWindowIterator { s"Found unexpected expression $other in window exec ${other.getClass}") } :+ GpuAlias(GpuCount(Seq(GpuLiteral(1L))), "_count")() // Later code by conventions "knows" that the last column is a count and that it can be - // thrown away. If we ever dedupe this with a COUNT(1) that already exists, then - // we need to update the output of this to have a clean way to say what is the count, - // and if that count is needed see repeatOps + // thrown away. We should never try and dedupe this count with an existing count column, + // because if we need to slice the aggregation results we will modify the count column + // to do that. This will not work if we are going to output that count column. val aggregationsOutput = windowAggs.map(_.toAttribute) val boundAggregations = GpuBindReferences.bindGpuReferences(windowAggs, childOutput) (aggregationsOutput, boundAggregations) } - def repeatOps( - aggregationsOutput: Seq[Attribute]): (GpuExpression, Seq[Attribute], Seq[GpuExpression]) = { - // It is assumed that the last aggregation column is a count that we will use for repeat - // If that ever changes, this code needs to be updated. - val aggOutputExpressions = aggregationsOutput.map { attr => - GpuAlias( - AttributeReference(attr.name, attr.dataType, attr.nullable)(attr.exprId), - attr.name)(attr.exprId) - } - val boundAggOutputExpressions = - GpuBindReferences.bindGpuReferences(aggOutputExpressions, aggregationsOutput) - - val boundCount = boundAggOutputExpressions.last - val aggsToRepeat = boundAggOutputExpressions.slice(0, boundAggOutputExpressions.length - 1) - val aggsToRepeatOutput = aggregationsOutput.slice(0, aggregationsOutput.length - 1) - (boundCount, aggsToRepeatOutput, aggsToRepeat) + def repeatOps(aggregationsOutput: Seq[Attribute]): Seq[Attribute] = { + // By convention the last column in the aggs is the count column we want to use + aggregationsOutput.slice(0, aggregationsOutput.length - 1) } def computeFinalProject(rideAlongOutput: Seq[Attribute], @@ -339,17 +633,16 @@ object GpuUnboundedToUnboundedAggWindowIterator { // STEP 2. project that will pull out the columns needed for the aggregation. val (aggregationsOutput, boundAggregations) = tmpAggregationOps(windowOps, childOutput) - // STEP N: Given the output of the aggregations get count column and the other - // columns so we can do the repeat call. - val (boundCount, aggsToRepeatOutput, aggsToRepeat) = repeatOps(aggregationsOutput) + // STEP N: Given the output of the aggregations get the aggregations without that count. + // The count and aggs locations is by convention. + val aggsToRepeatOutput = repeatOps(aggregationsOutput) // STEP N + 1: After the repeat is done the repeated columns are put at the end of the // rideAlong columns and then we need to do a project that would put them all in the // proper output order, according to the windowOps val finalProject = computeFinalProject(rideAlongOutput, aggsToRepeatOutput, windowOps) - GpuUnboundedToUnboundedAggStages(boundRideAlong, boundAggregations, - boundCount, aggsToRepeat, finalProject) + GpuUnboundedToUnboundedAggStages(boundRideAlong, boundAggregations, finalProject) } def apply(input: Iterator[ColumnarBatch], diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala index 9c05fe00e94..22e9ae24705 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -32,14 +32,12 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { AttributeReference("_count", LongType, true)()) val rideAlongOutput = Seq(AttributeReference("a", ShortType, true)()) - val (boundCount, repeatOutput, boundRepeat) = - GpuUnboundedToUnboundedAggWindowIterator.repeatOps(aggOutput) + val repeatOutput = GpuUnboundedToUnboundedAggWindowIterator.repeatOps(aggOutput) val finalProject = GpuUnboundedToUnboundedAggWindowIterator.computeFinalProject( rideAlongOutput, repeatOutput, repeatOutput ++ rideAlongOutput) - val conf = GpuUnboundedToUnboundedAggStages(Seq.empty, Seq.empty, - boundCount, boundRepeat, finalProject) + val conf = GpuUnboundedToUnboundedAggStages(Seq.empty, Seq.empty, finalProject) def makeRepeatCb(): SpillableColumnarBatch = { // very basic test to verify that the repeat stage works properly. From 16720b12f5238314a591791affbed1d5188d5ba1 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 12 Jan 2024 11:07:58 -0600 Subject: [PATCH 08/14] Finished putting these pieces back together again --- ...GpuUnboundedToUnboundedAggWindowExec.scala | 148 +++++++----------- .../rapids/window/GpuWindowExecMeta.scala | 13 +- ...puUnboundedToUnboundedAggWindowSuite.scala | 10 +- 3 files changed, 71 insertions(+), 100 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index ebe1d62ef8c..28c4f49b187 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -176,10 +176,11 @@ case class SlicedBySize(rideAlongColumns: SpillableColumnarBatch, object PendingSecondAggResults { def apply(result: SecondPassAggResult, boundStages: GpuUnboundedToUnboundedAggStages, - targetSizeBytes: Long): PendingSecondAggResults = { + targetSizeBytes: Long, + opTime: GpuMetric): PendingSecondAggResults = { closeOnExcept(result) { _ => new PendingSecondAggResults(result.rideAlongColumns, result.aggResult, - boundStages, targetSizeBytes) + boundStages, targetSizeBytes, opTime) } } @@ -213,14 +214,17 @@ object PendingSecondAggResults { } def concatBatchesAndClose(toConcat: AutoClosableArrayBuffer[SpillableColumnarBatch], - sparkTypes: Array[DataType]): SpillableColumnarBatch = { + sparkTypes: Array[DataType], + opTime: GpuMetric): SpillableColumnarBatch = { val cb = withRetryNoSplit(toConcat) { _ => - closeOnExcept(new AutoClosableArrayBuffer[ColumnarBatch]) { cbs => - toConcat.foreach { scb => - cbs.append(scb.getColumnarBatch()) + opTime.ns { + closeOnExcept(new AutoClosableArrayBuffer[ColumnarBatch]) { cbs => + toConcat.foreach { scb => + cbs.append(scb.getColumnarBatch()) + } + // This consumes/closes the array of batches + ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(cbs.toArray, sparkTypes) } - // This consumes/closes the array of batches - ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(cbs.toArray, sparkTypes) } } SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) @@ -266,7 +270,8 @@ class PendingSecondAggResults private( private val rideAlongColumns: util.LinkedList[SpillableColumnarBatch], private var aggResult: SpillableColumnarBatch, private val boundStages: GpuUnboundedToUnboundedAggStages, - private val targetSizeBytes: Long) extends Iterator[SlicedBySize] with AutoCloseable { + private val targetSizeBytes: Long, + opTime: GpuMetric) extends Iterator[SlicedBySize] with AutoCloseable { import PendingSecondAggResults._ private var totalRowsInAgg = { @@ -331,8 +336,10 @@ class PendingSecondAggResults private( val theLastOne = toProcess.removeLast() val numRowsToKeepInLastBatch = (theLastOne.numRows() - numRowsToRemove).toInt val (keep, forNextTime) = withRetryNoSplit(theLastOne) { _ => - withResource(theLastOne.getColumnarBatch()) { cb => - splitCb(cb, numRowsToKeepInLastBatch) + opTime.ns { + withResource(theLastOne.getColumnarBatch()) { cb => + splitCb(cb, numRowsToKeepInLastBatch) + } } } rideAlongColumns.addFirst(SpillableColumnarBatch(forNextTime, @@ -343,7 +350,7 @@ class PendingSecondAggResults private( } } } - concatBatchesAndClose(toProcess, boundStages.boundRideAlong.map(_.dataType).toArray) + concatBatchesAndClose(toProcess, boundStages.boundRideAlong.map(_.dataType).toArray, opTime) } def getSlicedAggResultByRepeatedRows(numDesiredRows: Int): SpillableColumnarBatch = { @@ -383,7 +390,8 @@ class PendingSecondAggResults private( class GpuUnboundedToUnboundedAggSliceBySizeIterator( input: Iterator[SecondPassAggResult], boundStages: GpuUnboundedToUnboundedAggStages, - targetSizeBytes: Long) extends Iterator[SlicedBySize] { + targetSizeBytes: Long, + opTime: GpuMetric) extends Iterator[SlicedBySize] { private var pending: Option[PendingSecondAggResults] = None private def pendingHasNext: Boolean = pending.exists(_.hasNext) @@ -396,7 +404,7 @@ class GpuUnboundedToUnboundedAggSliceBySizeIterator( } if (!pendingHasNext) { - pending = Some(PendingSecondAggResults(input.next(), boundStages, targetSizeBytes)) + pending = Some(PendingSecondAggResults(input.next(), boundStages, targetSizeBytes, opTime)) } pending.get.next } @@ -417,96 +425,50 @@ class GpuUnboundedToUnboundedAggSliceBySizeIterator( // return the result. class GpuUnboundedToUnboundedAggFinalIterator( - input: Iterator[SecondPassAggResult], + input: Iterator[SlicedBySize], boundStages: GpuUnboundedToUnboundedAggStages, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric) extends Iterator[ColumnarBatch] { - private var pending: Option[SecondPassAggResult] = None - Option(TaskContext.get()).foreach { tc => - onTaskCompletion(tc) { - closePending() - } - } - - private def hasMoreInPending: Boolean = pending.exists(!_.rideAlongColumns.isEmpty) - private def pendingAggResults: SpillableColumnarBatch = pending.get.aggResult.incRefCount() - private def nextPendingRideAlong: SpillableColumnarBatch = pending.get.rideAlongColumns.pop - private def closePending(): Unit = { - pending.foreach(_.aggResult.close()) - pending.foreach(_.rideAlongColumns.forEach(_.close())) - pending = None - } - private def replacePending(p: SecondPassAggResult): Unit = { - closePending() - pending = Some(p) - } - - override def hasNext: Boolean = hasMoreInPending || input.hasNext + override def hasNext: Boolean = input.hasNext override def next(): ColumnarBatch = { - // TODO we need to add in the retry code, and pre-splitting of the data if possible, but - // for now we are just going to try it. if (!hasNext) { throw new NoSuchElementException() } - while (!hasMoreInPending) { - replacePending(input.next()) - } - - // TODO this is a very dumb version right now that is not checking for size - // That will be added later on. + // TODO we need to add in the split to the retry - // TODO fix this. We don't want just one batch of ride along columns, and we don't - // want to leak anything if we run out of memory - var rideAlongCombined: ColumnarBatch = null - while (hasMoreInPending) { - val cb = withResource(nextPendingRideAlong) { scb => - scb.getColumnarBatch() - } - withResource(cb) { _ => - if (rideAlongCombined == null) { - rideAlongCombined = GpuColumnVector.incRefCounts(cb) - } else { - rideAlongCombined.close() - throw new IllegalStateException("Concat not implemented yet...") - } - } - } - - // The first stage is to expand the aggregate based on the count column - val combined = withResource(rideAlongCombined) { _ => - val repeatedCb = withResource(pendingAggResults) { scb => - opTime.ns { - withResource(scb.getColumnarBatch()) { cb => - withResource(AggResultBatchConventions.getCount(cb)) { counts => - withResource(AggResultBatchConventions.getRepeatedAggColumns(cb)) { toRepeat => - val dataTypes = GpuColumnVector.extractTypes(toRepeat) - withResource(GpuColumnVector.from(toRepeat)) { table => - withResource(table.repeat(counts.getBase)) { repeated => - GpuColumnVector.from(repeated, dataTypes) - } + withRetryNoSplit(input.next()) { toExpand => + opTime.ns { + // The first stage is to expand the aggregate based on the count column + val repeatedAggs = withResource(toExpand.aggResults.getColumnarBatch()) { cb => + withResource(AggResultBatchConventions.getCount(cb)) { counts => + withResource(AggResultBatchConventions.getRepeatedAggColumns(cb)) { toRepeat => + val dataTypes = GpuColumnVector.extractTypes(toRepeat) + withResource(GpuColumnVector.from(toRepeat)) { table => + withResource(table.repeat(counts.getBase)) { repeated => + GpuColumnVector.from(repeated, dataTypes) } } } } } - } - // Second step is to stitch the two together - withResource(repeatedCb) { _ => - opTime.ns { - GpuColumnVector.appendColumns(rideAlongCombined, - GpuColumnVector.extractColumns(repeatedCb): _*) + // Second step is to stitch the two together + val combined = withResource(repeatedAggs) { _ => + withResource(toExpand.rideAlongColumns.getColumnarBatch()) { rideAlong => + GpuColumnVector.appendColumns(rideAlong, + GpuColumnVector.extractColumns(repeatedAggs): _*) + } } - } - } - withResource(combined) { _ => - opTime.ns { - closeOnExcept(GpuProjectExec.project(combined, boundStages.boundFinalProject)) { ret => - numOutputBatches += 1 - numOutputRows += ret.numRows() - ret + withResource(combined) { _ => + opTime.ns { + closeOnExcept(GpuProjectExec.project(combined, boundStages.boundFinalProject)) { ret => + numOutputBatches += 1 + numOutputRows += ret.numRows() + ret + } + } } } } @@ -649,12 +611,15 @@ object GpuUnboundedToUnboundedAggWindowIterator { boundStages: GpuUnboundedToUnboundedAggStages, numOutputBatches: GpuMetric, numOutputRows: GpuMetric, - opTime: GpuMetric): Iterator[ColumnarBatch] = { + opTime: GpuMetric, + targetSizeBytes: Long): Iterator[ColumnarBatch] = { val firstPass = new GpuUnboundedToUnboundedAggWindowFirstPassIterator(input, boundStages, opTime) val secondPass = new GpuUnboundedToUnboundedAggWindowSecondPassIterator(firstPass, boundStages, opTime) - new GpuUnboundedToUnboundedAggFinalIterator(secondPass, boundStages, + val slicedBySize = new GpuUnboundedToUnboundedAggSliceBySizeIterator(secondPass, + boundStages, targetSizeBytes, opTime) + new GpuUnboundedToUnboundedAggFinalIterator(slicedBySize, boundStages, numOutputBatches, numOutputRows, opTime) } } @@ -671,7 +636,8 @@ case class GpuUnboundedToUnboundedAggWindowExec( gpuOrderSpec: Seq[SortOrder], child: SparkPlan)( override val cpuPartitionSpec: Seq[Expression], - override val cpuOrderSpec: Seq[SortOrder]) extends GpuWindowBaseExec { + override val cpuOrderSpec: Seq[SortOrder], + targetSizeBytes: Long) extends GpuWindowBaseExec { override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil @@ -691,7 +657,7 @@ case class GpuUnboundedToUnboundedAggWindowExec( child.executeColumnar().mapPartitions { iter => GpuUnboundedToUnboundedAggWindowIterator(iter, boundStages, - numOutputBatches, numOutputRows, opTime) + numOutputBatches, numOutputRows, opTime, targetSizeBytes) } } } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala index 77e8ae679d3..08b695dc10f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuWindowExecMeta.scala @@ -154,7 +154,8 @@ abstract class GpuBaseWindowExecMeta[WindowExecType <: SparkPlan] (windowExec: W orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), input, getPartitionSpecs, - getOrderSpecs) + getOrderSpecs, + conf) } else { new GpuWindowExec( fixedUpWindowOps, @@ -269,12 +270,13 @@ case class BatchedOps(running: Seq[NamedExpression], gpuOrderSpec: Seq[SortOrder], child: SparkPlan, cpuPartitionSpec: Seq[Expression], - cpuOrderSpec: Seq[SortOrder]): GpuExec = + cpuOrderSpec: Seq[SortOrder], + conf: RapidsConf): GpuExec = GpuUnboundedToUnboundedAggWindowExec( getUnboundedAggWithRunningAsPassthrough, gpuPartitionSpec, gpuOrderSpec, - child)(cpuPartitionSpec, cpuOrderSpec) + child)(cpuPartitionSpec, cpuOrderSpec, conf.gpuTargetBatchSizeBytes) private def getDoublePassWindowExec( gpuPartitionSpec: Seq[Expression], @@ -305,7 +307,8 @@ case class BatchedOps(running: Seq[NamedExpression], gpuOrderSpec: Seq[SortOrder], child: SparkPlan, cpuPartitionSpec: Seq[Expression], - cpuOrderSpec: Seq[SortOrder]): GpuExec = { + cpuOrderSpec: Seq[SortOrder], + conf: RapidsConf): GpuExec = { // The order of these matter so we can match the order of the parameters used to // create the various aggregation functions var currentPlan = child @@ -316,7 +319,7 @@ case class BatchedOps(running: Seq[NamedExpression], if (hasUnboundedAgg) { currentPlan = getUnboundedAggWindowExec(gpuPartitionSpec, gpuOrderSpec, currentPlan, - cpuPartitionSpec, cpuOrderSpec) + cpuPartitionSpec, cpuOrderSpec, conf) } if (hasDoublePass) { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala index 22e9ae24705..032ee5b997e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -28,10 +28,10 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { test("basic repeat test") { // First I need to setup the operations. I am trying to test repeat in isolation // so we are not going to build them up using the front door - val aggOutput = Seq(AttributeReference("my_max", IntegerType, true)(), - AttributeReference("_count", LongType, true)()) + val aggOutput = Seq(AttributeReference("my_max", IntegerType, nullable = true)(), + AttributeReference("_count", LongType, nullable = true)()) - val rideAlongOutput = Seq(AttributeReference("a", ShortType, true)()) + val rideAlongOutput = Seq(AttributeReference("a", ShortType, nullable = true)()) val repeatOutput = GpuUnboundedToUnboundedAggWindowIterator.repeatOps(aggOutput) val finalProject = GpuUnboundedToUnboundedAggWindowIterator.computeFinalProject( @@ -68,7 +68,9 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { val rideAlongList = new util.LinkedList[SpillableColumnarBatch] rideAlongList.add(makeRideAlongCb()) val inputIter = Seq(SecondPassAggResult(rideAlongList, makeRepeatCb())).toIterator - val repeatIter = new GpuUnboundedToUnboundedAggFinalIterator(inputIter, conf, + val splitIter = new GpuUnboundedToUnboundedAggSliceBySizeIterator(inputIter, conf, + 1024 * 1024 * 1024, NoopMetric) + val repeatIter = new GpuUnboundedToUnboundedAggFinalIterator(splitIter, conf, NoopMetric, NoopMetric, NoopMetric) assert(repeatIter.hasNext) From 54977de8a968b4968348355344f1ff1c31b9f015 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 16 Jan 2024 10:03:00 -0600 Subject: [PATCH 09/14] DEBUG --- .../spark/rapids/RapidsBufferCatalog.scala | 9 ++- .../spark/rapids/RapidsBufferStore.scala | 2 +- .../rapids/RapidsDeviceMemoryStore.scala | 3 +- .../spark/rapids/SpillableColumnarBatch.scala | 4 + ...GpuUnboundedToUnboundedAggWindowExec.scala | 27 +++---- .../spark/rapids/RmmSparkRetrySuiteBase.scala | 12 +++ ...puUnboundedToUnboundedAggWindowSuite.scala | 74 ++++++++++++++++--- 7 files changed, 103 insertions(+), 28 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index f98b52ae022..7149d1c525e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -81,6 +81,9 @@ class RapidsBufferCatalog( private var closed = false + override def toString: String = + s"buffer handle $id at $priority" + override def setSpillPriority(newPriority: Long): Unit = { priority = newPriority updateUnderlyingRapidsBuffer(this) @@ -725,7 +728,11 @@ class RapidsBufferCatalog( override def close(): Unit = { bufferIdToHandles.values.forEach { handles => - handles.foreach(_.close()) + handles.foreach{ h => + val tmp = bufferMap.get(h.id) + System.err.println(s"LOOKS LIKE YOU LEAKED $h / ${tmp.toList}") + h.close() + } } bufferIdToHandles.clear() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 98023259d82..460f9ec53c5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -622,7 +622,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) releaseResources() } - override def toString: String = s"$name buffer size=${memoryUsedBytes}" + override def toString: String = s"$name buffer size=$memoryUsedBytes" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index c56806bc965..1abe28600d4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -113,10 +113,11 @@ class RapidsDeviceMemoryStore( buffer, initialSpillPriority) freeOnExcept(rapidsBuffer) { _ => - logDebug(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + + System.err.println(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + s"uncompressed=${rapidsBuffer.meta.bufferMeta.uncompressedSize}, " + s"meta_id=${tableMeta.bufferMeta.id}, " + s"meta_size=${tableMeta.bufferMeta.size}]") + new Exception(s"ST_$id").printStackTrace(System.err) addBuffer(rapidsBuffer, needsSync) rapidsBuffer } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 4894fc82527..f3698badf83 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -92,6 +92,7 @@ class SpillableColumnarBatchImpl ( rowCount: Int, sparkTypes: Array[DataType]) extends SpillableColumnarBatch { + System.err.println(s"CREATED NEW $this") private var refCount = 1 override def dataTypes: Array[DataType] = sparkTypes @@ -146,6 +147,9 @@ class SpillableColumnarBatchImpl ( // throw new IllegalStateException("Double free on SpillableColumnarBatchImpl") // } } + + override def toString: String = + s"SCB $handle $rowCount ${sparkTypes.toList} $refCount" } class JustRowsHostColumnarBatch(numRows: Int) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index 28c4f49b187..21be94901c6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -197,13 +197,11 @@ object PendingSecondAggResults { // Slice is at the column level, not at a table level closeOnExcept(new ArrayBuffer[cudf.ColumnVector]()) { before => val afterCb = closeOnExcept(new ArrayBuffer[cudf.ColumnVector]()) { after => - withResource(GpuColumnVector.extractBases(cb)) { bases => - bases.foreach { base => - val result = base.split(inclusiveCutPoint) - before.append(result(0)) - after.append(result(1)) - assert(result.length == 2) - } + GpuColumnVector.extractBases(cb).foreach { base => + val result = base.split(inclusiveCutPoint) + before.append(result(0)) + after.append(result(1)) + assert(result.length == 2) } makeBatch(after.toArray, types) } @@ -214,17 +212,18 @@ object PendingSecondAggResults { } def concatBatchesAndClose(toConcat: AutoClosableArrayBuffer[SpillableColumnarBatch], - sparkTypes: Array[DataType], opTime: GpuMetric): SpillableColumnarBatch = { val cb = withRetryNoSplit(toConcat) { _ => opTime.ns { - closeOnExcept(new AutoClosableArrayBuffer[ColumnarBatch]) { cbs => + val ready = closeOnExcept(new AutoClosableArrayBuffer[ColumnarBatch]) { cbs => toConcat.foreach { scb => cbs.append(scb.getColumnarBatch()) } - // This consumes/closes the array of batches - ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(cbs.toArray, sparkTypes) + cbs.toArray } + // This consumes/closes the array of batches + ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(ready, + GpuColumnVector.extractTypes(ready.head)) } } SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) @@ -304,6 +303,7 @@ class PendingSecondAggResults private( closeOnExcept(toProcess) { _ => while (currentSize < targetSizeBytes && !rideAlongColumns.isEmpty) { val scb = rideAlongColumns.pop() + System.err.println(s"RIDE ALONG GOT $scb") toProcess.append(scb) val numRows = scb.numRows() val estimatedSize = (scb.sizeInBytes + (numRows * averageAggSizePerRow)).toLong @@ -350,7 +350,7 @@ class PendingSecondAggResults private( } } } - concatBatchesAndClose(toProcess, boundStages.boundRideAlong.map(_.dataType).toArray, opTime) + concatBatchesAndClose(toProcess, opTime) } def getSlicedAggResultByRepeatedRows(numDesiredRows: Int): SpillableColumnarBatch = { @@ -639,7 +639,8 @@ case class GpuUnboundedToUnboundedAggWindowExec( override val cpuOrderSpec: Seq[SortOrder], targetSizeBytes: Long) extends GpuWindowBaseExec { - override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: cpuOrderSpec :: Nil + override def otherCopyArgs: Seq[AnyRef] = + cpuPartitionSpec :: cpuOrderSpec :: new java.lang.Long(targetSizeBytes) :: Nil // For this we only need the data to be sorted by the partition columns, but // we don't change the input sort from the CPU yet. In some cases we might even diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala index 5776b2f99a8..aac1362fb8c 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala @@ -52,17 +52,29 @@ trait RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach { } override def afterEach(): Unit = { + System.err.println("A") super.afterEach() + System.err.println("B") SparkSession.getActiveSession.foreach(_.stop()) + System.err.println("C") SparkSession.clearActiveSession() + System.err.println("D") RmmSpark.removeAllCurrentThreadAssociation() + System.err.println("E") RmmSpark.clearEventHandler() + System.err.println("F") RapidsBufferCatalog.close() + System.err.println("G") GpuSemaphore.shutdown() + System.err.println("H") if (rmmWasInitialized) { + System.err.println("I.1") Rmm.shutdown() + System.err.println("I.2") } + System.err.println("J") HostAlloc.initialize(-1) + System.err.println("K") } private class BaseRmmEventHandler extends RmmEventHandler { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala index 032ee5b997e..665e98ecaab 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.window -import ai.rapids.cudf.{ColumnVector, Table} +import ai.rapids.cudf.{ColumnVector, Scalar, Table} import com.nvidia.spark.rapids.{GpuColumnVector, NoopMetric, RmmSparkRetrySuiteBase, SpillableColumnarBatch, SpillPriorities} import com.nvidia.spark.rapids.Arm.withResource import java.util @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.types.{DataType, IntegerType, LongType, ShortType} class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { - test("basic repeat test") { + def basicRepeatTest(numOutputRows: Long, rowsPerBatch: Int, targetSizeBytes: Int) : Unit = { // First I need to setup the operations. I am trying to test repeat in isolation // so we are not going to build them up using the front door val aggOutput = Seq(AttributeReference("my_max", IntegerType, nullable = true)(), @@ -42,7 +42,9 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { def makeRepeatCb(): SpillableColumnarBatch = { // very basic test to verify that the repeat stage works properly. val table = withResource(ColumnVector.fromInts(1, 2)) { data1 => - withResource(ColumnVector.fromLongs(2, 3)) { counts => + val firstBatchAmount = numOutputRows / 2 + val secondBatchAmount = numOutputRows - firstBatchAmount + withResource(ColumnVector.fromLongs(firstBatchAmount, secondBatchAmount)) { counts => new Table(data1, counts) } } @@ -53,29 +55,77 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { } } - def makeRideAlongCb(): SpillableColumnarBatch = { + def makeRideAlongCb(numRows: Int): SpillableColumnarBatch = { // very basic test to verify that the repeat stage works properly. - val table = withResource(ColumnVector.fromShorts(1, 2, 3, 4, 5)) { data1 => - new Table(data1) + val table = withResource(Scalar.fromShort(5.toShort)) { s => + withResource(ColumnVector.fromScalar(s, numRows)) { data1 => + new Table(data1) + } } withResource(table) { _ => SpillableColumnarBatch( GpuColumnVector.from(table, Array[DataType](ShortType)), SpillPriorities.ACTIVE_BATCHING_PRIORITY) - } } + val rideAlongList = new util.LinkedList[SpillableColumnarBatch] - rideAlongList.add(makeRideAlongCb()) + // TODO need a way not to leak rideAlongList + var rowsRemaining = numOutputRows + while (rowsRemaining > 0) { + val rowsToAdd = math.min(rowsRemaining, rowsPerBatch) + rowsRemaining -= rowsToAdd + rideAlongList.add(makeRideAlongCb(rowsToAdd.toInt)) + } val inputIter = Seq(SecondPassAggResult(rideAlongList, makeRepeatCb())).toIterator val splitIter = new GpuUnboundedToUnboundedAggSliceBySizeIterator(inputIter, conf, - 1024 * 1024 * 1024, NoopMetric) + targetSizeBytes, NoopMetric) val repeatIter = new GpuUnboundedToUnboundedAggFinalIterator(splitIter, conf, NoopMetric, NoopMetric, NoopMetric) - assert(repeatIter.hasNext) - withResource(repeatIter.next()) { result => - assert(result.numCols() == 2) + var numRowsActuallyOutput = 0L + while (repeatIter.hasNext) { + withResource(repeatIter.next()) { result => + numRowsActuallyOutput += result.numRows() + assert(result.numCols() == 2) + } + } + assert(numRowsActuallyOutput == numOutputRows) + } + + test("single batch repeat test") { + try { + basicRepeatTest(1000, 1000, 1024 * 1024 * 1024) + } finally { + System.gc() + System.gc() + } + } + + test("multi batch no split repeat test") { + try { + basicRepeatTest(1000, 100, 1024 * 1024 * 1024) + } finally { + System.gc() + System.gc() + } + } + + test("single batch with split repeat test") { + try { + basicRepeatTest(1000, 1000, 4 * 1024) + } finally { + System.gc() + System.gc() + } + } + + test("multi batch with split repeat test") { + try { + basicRepeatTest(1000, 100, 4 * 1024) + } finally { + System.gc() + System.gc() } } } From 44a4159895208433225128a62fc5a83b6052eef1 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 17 Jan 2024 12:27:28 -0600 Subject: [PATCH 10/14] Step --- .../spark/rapids/RapidsBufferCatalog.scala | 6 +- .../rapids/RapidsDeviceMemoryStore.scala | 3 +- .../spark/rapids/SpillableColumnarBatch.scala | 1 - ...GpuUnboundedToUnboundedAggWindowExec.scala | 127 ++++++++++++++++-- .../spark/rapids/GpuGenerateSuite.scala | 5 + .../spark/rapids/RmmSparkRetrySuiteBase.scala | 12 -- ...puUnboundedToUnboundedAggWindowSuite.scala | 25 +--- 7 files changed, 126 insertions(+), 53 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 7149d1c525e..42dd6050e88 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -728,11 +728,7 @@ class RapidsBufferCatalog( override def close(): Unit = { bufferIdToHandles.values.forEach { handles => - handles.foreach{ h => - val tmp = bufferMap.get(h.id) - System.err.println(s"LOOKS LIKE YOU LEAKED $h / ${tmp.toList}") - h.close() - } + handles.foreach(_.close()) } bufferIdToHandles.clear() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 1abe28600d4..c56806bc965 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -113,11 +113,10 @@ class RapidsDeviceMemoryStore( buffer, initialSpillPriority) freeOnExcept(rapidsBuffer) { _ => - System.err.println(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + + logDebug(s"Adding receive side table for: [id=$id, size=${buffer.getLength}, " + s"uncompressed=${rapidsBuffer.meta.bufferMeta.uncompressedSize}, " + s"meta_id=${tableMeta.bufferMeta.id}, " + s"meta_size=${tableMeta.bufferMeta.size}]") - new Exception(s"ST_$id").printStackTrace(System.err) addBuffer(rapidsBuffer, needsSync) rapidsBuffer } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index f3698badf83..d5216cbda9f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -92,7 +92,6 @@ class SpillableColumnarBatchImpl ( rowCount: Int, sparkTypes: Array[DataType]) extends SpillableColumnarBatch { - System.err.println(s"CREATED NEW $this") private var refCount = 1 override def dataTypes: Array[DataType] = sparkTypes diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index 21be94901c6..bb2bd0339f5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -53,6 +53,8 @@ class AutoClosableArrayBuffer[T <: AutoCloseable]() extends AutoCloseable { def toArray[B >: T : ClassTag]: Array[B] = data.toArray + override def toString: String = s"AutoCloseable(${super.toString})" + override def close(): Unit = { data.foreach(_.close()) data.clear() @@ -211,6 +213,65 @@ object PendingSecondAggResults { } } + def sliceInclusiveCb(cb: ColumnarBatch, inclusiveStart: Int, inclusiveEnd: Int): ColumnarBatch = { + // First save the types + val types = GpuColumnVector.extractTypes(cb) + // Slice is at the column level, not at a table level + closeOnExcept(new ArrayBuffer[cudf.ColumnVector]()) { cbs => + GpuColumnVector.extractBases(cb).foreach { base => + val result = base.slice(inclusiveStart, inclusiveEnd + 1) + cbs.append(result(0)) + assert(result.length == 1) + } + makeBatch(cbs.toArray, types) + } + } + + /** + * Makes a boolean vector where only one row is true. + * @param trueRow the row that should be true + * @param size the total number of rows. + */ + def makeSingleRowMask(trueRow: Int, size: Int): cudf.ColumnVector = { + assert(size > trueRow, s"$size > $trueRow") + // TODO probably want an optimization if the size is really small + val rowsBefore = trueRow + val rowsAfter = size - trueRow - 1 + if (rowsBefore == 0 && rowsAfter == 0) { + // Special Case where we cannot concat + cudf.ColumnVector.fromBooleans(true) + } else { + withResource(new AutoClosableArrayBuffer[cudf.ColumnView]) { toConcat => + withResource(cudf.Scalar.fromBool(false)) { fs => + if (rowsBefore > 0) { + toConcat.append(cudf.ColumnVector.fromScalar(fs, rowsBefore)) + } + toConcat.append(cudf.ColumnVector.fromBooleans(true)) + if (rowsAfter > 0) { + toConcat.append(cudf.ColumnVector.fromScalar(fs, rowsAfter)) + } + } + cudf.ColumnVector.concatenate(toConcat.toArray: _*) + } + } + } + + def replaceCountInAggAt(cb: ColumnarBatch, countRow: Int, newCount: Long): ColumnarBatch = { + // TODO I'm sure there is a lot we can do to optimize this, but this works... + withResource(AggResultBatchConventions.getRepeatedAggColumns(cb)) { aggColumns => + val newCountCv = withResource(AggResultBatchConventions.getCount(cb)) { count => + withResource(makeSingleRowMask(countRow, count.getRowCount.toInt)) { mask => + withResource(cudf.Scalar.fromLong(newCount)) { ncScalar => + mask.ifElse(ncScalar, count.getBase) + } + } + } + withResource(newCountCv) { _ => + AggResultBatchConventions.appendCountColumn(aggColumns, newCountCv) + } + } + } + def concatBatchesAndClose(toConcat: AutoClosableArrayBuffer[SpillableColumnarBatch], opTime: GpuMetric): SpillableColumnarBatch = { val cb = withRetryNoSplit(toConcat) { _ => @@ -258,7 +319,50 @@ object PendingSecondAggResults { // in the aggregation we need to split on. The only way to do that is to get a // running sum of the counts, and then do an upper bound on that column withResource(AggResultBatchConventions.getCount(cb)) { counts => - throw new IllegalStateException("NOT IMPLEMENTED YET...") + val (splitIndex, countToKeep, countForNextTime) = + withResource(counts.getBase.prefixSum()) { runningCount => + val splitIndex = withResource(new cudf.Table(runningCount)) { runningCountTable => + withResource(cudf.ColumnVector.fromLongs(targetRows)) { tr => + withResource(new cudf.Table(tr)) { targetRowsTable => + runningCountTable.lowerBound(Array(true), targetRowsTable, Array(false)) + } + } + } + withResource(splitIndex) { _ => + val indexToLookAt = withResource(splitIndex.getScalarElement(0)) { s => + s.getInt + } + val totalRowsUpToIndex = withResource( + runningCount.getScalarElement(indexToLookAt)) { s => + s.getLong + } + val countInRow = withResource(counts.getBase.getScalarElement(indexToLookAt)) { s => + s.getLong + } + val countToKeep = targetRows - (totalRowsUpToIndex - countInRow) + val countForNextTime = countInRow - countToKeep + (indexToLookAt, countToKeep, countForNextTime) + } + } + if (countForNextTime == 0) { + // We got lucky and it is on an agg boundary + val (a, b) = splitCb(cb, splitIndex) + (SpillableColumnarBatch(a, SpillPriorities.ACTIVE_ON_DECK_PRIORITY), + SpillableColumnarBatch(b, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + } else { + val scbFirst = withResource(sliceInclusiveCb(cb, 0, splitIndex)) { first => + SpillableColumnarBatch(replaceCountInAggAt(first, splitIndex, countToKeep), + SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + } + closeOnExcept(scbFirst) { _ => + val scbSecond = withResource(sliceInclusiveCb(cb, splitIndex, cb.numRows() - 1)) { + second => + SpillableColumnarBatch(replaceCountInAggAt(second, 0, countForNextTime), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + } + (scbFirst, scbSecond) + } + } } } } @@ -303,7 +407,6 @@ class PendingSecondAggResults private( closeOnExcept(toProcess) { _ => while (currentSize < targetSizeBytes && !rideAlongColumns.isEmpty) { val scb = rideAlongColumns.pop() - System.err.println(s"RIDE ALONG GOT $scb") toProcess.append(scb) val numRows = scb.numRows() val estimatedSize = (scb.sizeInBytes + (numRows * averageAggSizePerRow)).toLong @@ -406,7 +509,13 @@ class GpuUnboundedToUnboundedAggSliceBySizeIterator( if (!pendingHasNext) { pending = Some(PendingSecondAggResults(input.next(), boundStages, targetSizeBytes, opTime)) } - pending.get.next + val ret = pending.get.next() + // avoid leaks in the tests + if (!pendingHasNext) { + pending.get.close() + pending = None + } + ret } Option(TaskContext.get()).foreach { tc => @@ -462,12 +571,10 @@ class GpuUnboundedToUnboundedAggFinalIterator( } } withResource(combined) { _ => - opTime.ns { - closeOnExcept(GpuProjectExec.project(combined, boundStages.boundFinalProject)) { ret => - numOutputBatches += 1 - numOutputRows += ret.numRows() - ret - } + closeOnExcept(GpuProjectExec.project(combined, boundStages.boundFinalProject)) { ret => + numOutputBatches += 1 + numOutputRows += ret.numRows() + ret } } } @@ -509,7 +616,7 @@ object AggResultBatchConventions { def appendCountColumn(repeatedAggColumns: ColumnarBatch, counts: cudf.ColumnVector): ColumnarBatch = { - val countCol = GpuColumnVector.fromChecked(counts.incRefCount(), LongType) + val countCol = GpuColumnVector.fromChecked(counts, LongType) GpuColumnVector.appendColumns(repeatedAggColumns, countCol) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala index d0f3339b425..e69f7c75118 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala @@ -278,6 +278,11 @@ class GpuGenerateSuite override def sizeInBytes: Long = spillable.sizeInBytes override def dataTypes: Array[DataType] = spillable.dataTypes override def close(): Unit = spillable.close() + + override def incRefCount(): SpillableColumnarBatch = { + spillable.incRefCount() + this + } } trait TestGenerator extends GpuExplodeBase { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala index aac1362fb8c..5776b2f99a8 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RmmSparkRetrySuiteBase.scala @@ -52,29 +52,17 @@ trait RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach { } override def afterEach(): Unit = { - System.err.println("A") super.afterEach() - System.err.println("B") SparkSession.getActiveSession.foreach(_.stop()) - System.err.println("C") SparkSession.clearActiveSession() - System.err.println("D") RmmSpark.removeAllCurrentThreadAssociation() - System.err.println("E") RmmSpark.clearEventHandler() - System.err.println("F") RapidsBufferCatalog.close() - System.err.println("G") GpuSemaphore.shutdown() - System.err.println("H") if (rmmWasInitialized) { - System.err.println("I.1") Rmm.shutdown() - System.err.println("I.2") } - System.err.println("J") HostAlloc.initialize(-1) - System.err.println("K") } private class BaseRmmEventHandler extends RmmEventHandler { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala index 665e98ecaab..ea159a5d101 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -70,7 +70,6 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { } val rideAlongList = new util.LinkedList[SpillableColumnarBatch] - // TODO need a way not to leak rideAlongList var rowsRemaining = numOutputRows while (rowsRemaining > 0) { val rowsToAdd = math.min(rowsRemaining, rowsPerBatch) @@ -94,38 +93,18 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { } test("single batch repeat test") { - try { - basicRepeatTest(1000, 1000, 1024 * 1024 * 1024) - } finally { - System.gc() - System.gc() - } + basicRepeatTest(1000, 1000, 1024 * 1024 * 1024) } test("multi batch no split repeat test") { - try { - basicRepeatTest(1000, 100, 1024 * 1024 * 1024) - } finally { - System.gc() - System.gc() - } + basicRepeatTest(1000, 100, 1024 * 1024 * 1024) } test("single batch with split repeat test") { - try { basicRepeatTest(1000, 1000, 4 * 1024) - } finally { - System.gc() - System.gc() - } } test("multi batch with split repeat test") { - try { basicRepeatTest(1000, 100, 4 * 1024) - } finally { - System.gc() - System.gc() - } } } From 57cc117f49901afd7bb37762289dc917d2c68664 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 17 Jan 2024 13:36:18 -0600 Subject: [PATCH 11/14] Some bug fixes --- .../window/GpuUnboundedToUnboundedAggWindowExec.scala | 2 +- .../window/GpuUnboundedToUnboundedAggWindowSuite.scala | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index bb2bd0339f5..e3f370f8a18 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -346,7 +346,7 @@ object PendingSecondAggResults { } if (countForNextTime == 0) { // We got lucky and it is on an agg boundary - val (a, b) = splitCb(cb, splitIndex) + val (a, b) = splitCb(cb, splitIndex + 1) (SpillableColumnarBatch(a, SpillPriorities.ACTIVE_ON_DECK_PRIORITY), SpillableColumnarBatch(b, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) } else { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala index ea159a5d101..21062ef08f7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -107,4 +107,14 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { test("multi batch with split repeat test") { basicRepeatTest(1000, 100, 4 * 1024) } + + test("single batch with even split") { + // This is a bit brittle, but the targetSizeBytes is tuned so that we split + // the agg buffer exactly in half, which is a corner case we had to handle + // because the agg generated has two rows. Currently the average row + // size used in the heuristic is 14 bytes + basicRepeatTest(1000, 1000, 500 * 14) + } + + // TODO need a way to set the number of agg rows instead... } From bf9057f1e1b51fbfc72d884bb497f52624c2a846 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 17 Jan 2024 13:56:56 -0600 Subject: [PATCH 12/14] Update tests --- ...puUnboundedToUnboundedAggWindowSuite.scala | 61 +++++++++++++------ 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala index 21062ef08f7..000a097e36f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids.window +import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf.{ColumnVector, Scalar, Table} import com.nvidia.spark.rapids.{GpuColumnVector, NoopMetric, RmmSparkRetrySuiteBase, SpillableColumnarBatch, SpillPriorities} import com.nvidia.spark.rapids.Arm.withResource @@ -25,7 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.types.{DataType, IntegerType, LongType, ShortType} class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { - def basicRepeatTest(numOutputRows: Long, rowsPerBatch: Int, targetSizeBytes: Int) : Unit = { + def basicRepeatTest(numOutputRows: Long, rowsPerRideAlongBatch: Int, + aggGroups: Int, targetSizeBytes: Int) : Unit = { // First I need to setup the operations. I am trying to test repeat in isolation // so we are not going to build them up using the front door val aggOutput = Seq(AttributeReference("my_max", IntegerType, nullable = true)(), @@ -40,12 +43,21 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { val conf = GpuUnboundedToUnboundedAggStages(Seq.empty, Seq.empty, finalProject) def makeRepeatCb(): SpillableColumnarBatch = { - // very basic test to verify that the repeat stage works properly. - val table = withResource(ColumnVector.fromInts(1, 2)) { data1 => - val firstBatchAmount = numOutputRows / 2 - val secondBatchAmount = numOutputRows - firstBatchAmount - withResource(ColumnVector.fromLongs(firstBatchAmount, secondBatchAmount)) { counts => - new Table(data1, counts) + val data = ArrayBuffer[Int]() + val counts = ArrayBuffer[Long]() + var rowRemainingForRepeat = numOutputRows + var groupId = 0 + val rowsPerGroup = math.ceil(numOutputRows.toDouble / aggGroups).toLong + while(rowRemainingForRepeat > 0) { + data.append(groupId) + val rowsInGroup = math.min(rowRemainingForRepeat, rowsPerGroup) + counts.append(rowsInGroup) + groupId += 1 + rowRemainingForRepeat -= rowsInGroup + } + val table = withResource(ColumnVector.fromInts(data: _*)) { dataCv => + withResource(ColumnVector.fromLongs(counts: _*)) { countsCv => + new Table(dataCv, countsCv) } } withResource(table) { _ => @@ -72,7 +84,7 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { val rideAlongList = new util.LinkedList[SpillableColumnarBatch] var rowsRemaining = numOutputRows while (rowsRemaining > 0) { - val rowsToAdd = math.min(rowsRemaining, rowsPerBatch) + val rowsToAdd = math.min(rowsRemaining, rowsPerRideAlongBatch) rowsRemaining -= rowsToAdd rideAlongList.add(makeRideAlongCb(rowsToAdd.toInt)) } @@ -93,28 +105,39 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { } test("single batch repeat test") { - basicRepeatTest(1000, 1000, 1024 * 1024 * 1024) + basicRepeatTest(1000, 1000, 2, 1024 * 1024 * 1024) } test("multi batch no split repeat test") { - basicRepeatTest(1000, 100, 1024 * 1024 * 1024) + basicRepeatTest(1000, 100, 2, 1024 * 1024 * 1024) } test("single batch with split repeat test") { - basicRepeatTest(1000, 1000, 4 * 1024) + basicRepeatTest(1000, 1000, 2, 4 * 1024) } test("multi batch with split repeat test") { - basicRepeatTest(1000, 100, 4 * 1024) + basicRepeatTest(1000, 100, 2, 4 * 1024) + } + + test("single batch split on agg boundary") { + basicRepeatTest(1000, 1000, 1000, 1024) + } + + test("single batch single agg repeat test") { + basicRepeatTest(1000, 1000, 1, 1024 * 1024 * 1024) + } + + test("multi batch no split single agg repeat test") { + basicRepeatTest(1000, 100, 1, 1024 * 1024 * 1024) + } + + test("single batch with split single agg repeat test") { + basicRepeatTest(1000, 1000, 1, 4 * 1024) } - test("single batch with even split") { - // This is a bit brittle, but the targetSizeBytes is tuned so that we split - // the agg buffer exactly in half, which is a corner case we had to handle - // because the agg generated has two rows. Currently the average row - // size used in the heuristic is 14 bytes - basicRepeatTest(1000, 1000, 500 * 14) + test("multi batch with split single agg repeat test") { + basicRepeatTest(1000, 100, 1, 4 * 1024) } - // TODO need a way to set the number of agg rows instead... } From e81385a6fdb5931b92d2de6d2c296670a2991e02 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 17 Jan 2024 14:00:41 -0600 Subject: [PATCH 13/14] Copyright --- .../scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala | 2 +- .../main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 42dd6050e88..5a4086865cf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 460f9ec53c5..b1ee9e7a863 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From d2905944b8372add0acc029e56a35c684eda40c5 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 17 Jan 2024 14:33:45 -0600 Subject: [PATCH 14/14] Some fixes for JDK11 and Scala 2.13 --- .../rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala | 2 +- .../rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala index e3f370f8a18..639a9fe7e41 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowExec.scala @@ -747,7 +747,7 @@ case class GpuUnboundedToUnboundedAggWindowExec( targetSizeBytes: Long) extends GpuWindowBaseExec { override def otherCopyArgs: Seq[AnyRef] = - cpuPartitionSpec :: cpuOrderSpec :: new java.lang.Long(targetSizeBytes) :: Nil + cpuPartitionSpec :: cpuOrderSpec :: targetSizeBytes.asInstanceOf[java.lang.Long] :: Nil // For this we only need the data to be sorted by the partition columns, but // we don't change the input sort from the CPU yet. In some cases we might even diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala index 000a097e36f..5c3158b0d4e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/window/GpuUnboundedToUnboundedAggWindowSuite.scala @@ -55,8 +55,8 @@ class GpuUnboundedToUnboundedAggWindowSuite extends RmmSparkRetrySuiteBase { groupId += 1 rowRemainingForRepeat -= rowsInGroup } - val table = withResource(ColumnVector.fromInts(data: _*)) { dataCv => - withResource(ColumnVector.fromLongs(counts: _*)) { countsCv => + val table = withResource(ColumnVector.fromInts(data.toSeq: _*)) { dataCv => + withResource(ColumnVector.fromLongs(counts.toSeq: _*)) { countsCv => new Table(dataCv, countsCv) } }