Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in framework for unbounded to unbounded window agg optimization #10158

Merged
merged 16 commits into from
Feb 8, 2024
Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,6 +81,9 @@ class RapidsBufferCatalog(

private var closed = false

override def toString: String =
s"buffer handle $id at $priority"

override def setSpillPriority(newPriority: Long): Unit = {
priority = newPriority
updateUnderlyingRapidsBuffer(this)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -622,7 +622,7 @@ abstract class RapidsBufferStore(val tier: StorageTier)
releaseResources()
}

override def toString: String = s"$name buffer size=${memoryUsedBytes}"
override def toString: String = s"$name buffer size=$memoryUsedBytes"
}
}

Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(false)

val ENABLE_WINDOW_UNBOUNDED_AGG = conf("spark.rapids.sql.window.unboundedAgg.enabled")
.doc("This is a temporary internal config to turn on an unbounded to unbounded " +
"window optimization that is still a work in progress. It should eventually replace " +
"the double pass window exec.")
.internal()
.booleanConf
.createWithDefault(false)

val ENABLE_FLOAT_AGG = conf("spark.rapids.sql.variableFloatAgg.enabled")
.doc("Spark assumes that all operations produce the exact same result each time. " +
"This is not true for some floating point aggregations, which can produce slightly " +
Expand Down Expand Up @@ -2458,6 +2466,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isWindowCollectSetEnabled: Boolean = get(ENABLE_WINDOW_COLLECT_SET)

lazy val isWindowUnboundedAggEnabled: Boolean = get(ENABLE_WINDOW_UNBOUNDED_AGG)

lazy val isFloatAggEnabled: Boolean = get(ENABLE_FLOAT_AGG)

lazy val explain: String = get(EXPLAIN)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,12 @@ trait SpillableColumnarBatch extends AutoCloseable {
*/
def setSpillPriority(priority: Long): Unit

/**
* Increment the reference count for this batch (if applicable) and
* return this for easy chaining.
*/
def incRefCount(): SpillableColumnarBatch

/**
* Get the columnar batch.
* @note It is the responsibility of the caller to close the batch.
Expand Down Expand Up @@ -70,6 +76,9 @@ class JustRowsColumnarBatch(numRows: Int)
override val sizeInBytes: Long = 0L

override def dataTypes: Array[DataType] = Array.empty

// There is no off heap data and close is a noop so just return this
override def incRefCount(): SpillableColumnarBatch = this
}

/**
Expand All @@ -83,6 +92,7 @@ class SpillableColumnarBatchImpl (
rowCount: Int,
sparkTypes: Array[DataType])
extends SpillableColumnarBatch {
private var refCount = 1

override def dataTypes: Array[DataType] = sparkTypes
/**
Expand Down Expand Up @@ -113,13 +123,32 @@ class SpillableColumnarBatchImpl (
}
}

override def incRefCount(): SpillableColumnarBatch = {
if (refCount <= 0) {
throw new IllegalStateException("Use after free on SpillableColumnarBatchImpl")
}
refCount += 1
this
}

/**
* Remove the `ColumnarBatch` from the cache.
*/
override def close(): Unit = {
// closing my reference
handle.close()
refCount -= 1
if (refCount == 0) {
// closing my reference
handle.close()
}
// TODO this is causing problems so we need to look into this
// https://github.com/NVIDIA/spark-rapids/issues/10161
// else if (refCount < 0) {
// throw new IllegalStateException("Double free on SpillableColumnarBatchImpl")
// }
}

override def toString: String =
s"SCB $handle $rowCount ${sparkTypes.toList} $refCount"
}

class JustRowsHostColumnarBatch(numRows: Int)
Expand All @@ -135,6 +164,9 @@ class JustRowsHostColumnarBatch(numRows: Int)
override val sizeInBytes: Long = 0L

override def dataTypes: Array[DataType] = Array.empty

// There is no off heap data and close is a noop so just return this
override def incRefCount(): SpillableColumnarBatch = this
}

/**
Expand All @@ -149,6 +181,7 @@ class SpillableHostColumnarBatchImpl (
sparkTypes: Array[DataType],
catalog: RapidsBufferCatalog)
extends SpillableColumnarBatch {
private var refCount = 1

override def dataTypes: Array[DataType] = sparkTypes

Expand Down Expand Up @@ -180,12 +213,25 @@ class SpillableHostColumnarBatchImpl (
}
}

override def incRefCount(): SpillableColumnarBatch = {
if (refCount <= 0) {
throw new IllegalStateException("Use after free on SpillableHostColumnarBatchImpl")
}
refCount += 1
this
}

/**
* Remove the `ColumnarBatch` from the cache.
*/
override def close(): Unit = {
// closing my reference
handle.close()
refCount -= 1
if (refCount == 0) {
// closing my reference
handle.close()
} else if (refCount < 0) {
throw new IllegalStateException("Double free on SpillableHostColumnarBatchImpl")
}
}
}

Expand Down
Loading
Loading