Skip to content

Commit

Permalink
240701 repartition agg (#36)
Browse files Browse the repository at this point in the history
* workable version without tests

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* doc

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix scala 2.13

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

---------

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Co-authored-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
wjxiz1992 and binmahone authored Jul 1, 2024
1 parent de9c1b8 commit 21717dd
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 127 deletions.
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.fallbackAlgorithm"></a>spark.rapids.sql.agg.fallbackAlgorithm|When agg cannot be done in a single pass, use sort-based fallback or repartition-based fallback.|sort|Runtime
<a name="sql.agg.skipAggPassReductionRatio"></a>spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime
<a name="sql.allowMultipleJars"></a>spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
<a name="sql.castDecimalToFloat.enabled"></a>spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
Expand Down
19 changes: 17 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.nvidia.spark.rapids

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.ControlThrowable

import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand All @@ -43,7 +43,8 @@ object Arm extends ArmScalaSpecificImpl {
}

/** Executes the provided code block and then closes the sequence of resources */
def withResource[T <: AutoCloseable, V](r: Seq[T])(block: Seq[T] => V): V = {
def withResource[T <: AutoCloseable, V](r: scala.collection.Seq[T])
(block: scala.collection.Seq[T] => V): V = {
try {
block(r)
} finally {
Expand Down Expand Up @@ -134,6 +135,20 @@ object Arm extends ArmScalaSpecificImpl {
}
}

/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: ListBuffer[T])(block: ListBuffer[T] => V): V = {
try {
block(r)
} catch {
case t: ControlThrowable =>
// Don't close for these cases..
throw t
case t: Throwable =>
r.safeClose(t)
throw t
}
}


/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: mutable.Queue[T])(block: mutable.Queue[T] => V): V = {
Expand Down
Loading

0 comments on commit 21717dd

Please sign in to comment.