Skip to content

Commit

Permalink
repartition-based fallback for hash aggregate v3 (#11712)
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Signed-off-by: Firestarman <[email protected]>
Co-authored-by: Firestarman <[email protected]>
  • Loading branch information
binmahone and firestarman authored Nov 26, 2024
1 parent e3dce9e commit 4fa0a1d
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 354 deletions.
16 changes: 15 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.nvidia.spark.rapids

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.ControlThrowable

import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -134,6 +134,20 @@ object Arm extends ArmScalaSpecificImpl {
}
}

/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: ListBuffer[T])(block: ListBuffer[T] => V): V = {
try {
block(r)
} catch {
case t: ControlThrowable =>
// Don't close for these cases..
throw t
case t: Throwable =>
r.safeClose(t)
throw t
}
}


/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: mutable.Queue[T])(block: mutable.Queue[T] => V): V = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/**
* Just a simple wrapper to make working with buffers of AutoClosable things play
* nicely with withResource.
*/
class AutoClosableArrayBuffer[T <: AutoCloseable] extends AutoCloseable {
val data = new ArrayBuffer[T]()

def append(scb: T): Unit = data.append(scb)

def last: T = data.last

def removeLast(): T = data.remove(data.length - 1)

def foreach[U](f: T => U): Unit = data.foreach(f)

def map[U](f: T => U): Seq[U] = data.map(f).toSeq

def toArray[B >: T : ClassTag]: Array[B] = data.toArray

def size(): Int = data.size

def clear(): Unit = data.clear()

def forall(p: T => Boolean): Boolean = data.forall(p)

def iterator: Iterator[T] = data.iterator

override def toString: String = s"AutoCloseable(${super.toString})"

override def close(): Unit = {
data.foreach(_.close())
data.clear()
}
}
Loading

0 comments on commit 4fa0a1d

Please sign in to comment.