From e94d6bea3ec7a06b1341beddc994774608a201ec Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 19 Dec 2023 16:53:25 +0800 Subject: [PATCH 1/8] Fix failed cases for non-utc time zone (#10060) * Fix failed cases for non-utc time zone Signed-off-by: Chong Gao * Add doc * Update doc --------- Signed-off-by: Chong Gao Co-authored-by: Chong Gao --- integration_tests/README.md | 20 +++++++++++++++++++ .../src/main/python/array_test.py | 5 ++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/integration_tests/README.md b/integration_tests/README.md index fa06c75f0f9..9e4ba5a378e 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -343,6 +343,26 @@ integration tests. For example: $ DATAGEN_SEED=1702166057 SPARK_HOME=~/spark-3.4.0-bin-hadoop3 integration_tests/run_pyspark_from_build.sh ``` +### Running with non-UTC time zone +For the new added cases, we should check non-UTC time zone is working, or the non-UTC nightly CIs will fail. +The non-UTC nightly CIs are verifing all cases with non-UTC time zone. +But only a small amout of cases are verifing with non-UTC time zone in the pre-merge CI due to limited GPU resources. +When adding cases, should also check non-UTC is working besides the default UTC time zone. +Please test the following time zones: +```shell +$ TZ=Iran ./integration_tests/run_pyspark_from_build.sh +$ TZ=America/Los_Angeles ./integration_tests/run_pyspark_from_build.sh +``` +`Iran` is non-DST(Daylight Savings Time) time zone and `America/Los_Angeles` is DST time zone. + +If the new added cases failed with non-UTC, then should allow the operator(does not support non-UTC) fallback, +For example, add the following annotation to the case: +```python +non_utc_allow_for_sequence = ['ProjectExec'] # Update after non-utc time zone is supported for sequence +@allow_non_gpu(*non_utc_allow_for_sequence) +test_my_new_added_case_for_sequence_operator() +``` + ### Reviewing integration tests in Spark History Server If the integration tests are run using [run_pyspark_from_build.sh](run_pyspark_from_build.sh) we have diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index c9749865438..e2d7d1b5c81 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -17,7 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_gpu_fallback_collect from data_gen import * from conftest import is_databricks_runtime -from marks import incompat +from marks import incompat, allow_non_gpu from spark_session import is_before_spark_313, is_before_spark_330, is_databricks113_or_later, is_spark_330_or_later, is_databricks104_or_later, is_spark_33X, is_spark_340_or_later, is_spark_330, is_spark_330cdh from pyspark.sql.types import * from pyspark.sql.types import IntegralType @@ -332,11 +332,14 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it) +non_utc_allow_for_sequence = ['ProjectExec'] # Update after non-utc time zone is supported for sequence +@allow_non_gpu(*non_utc_allow_for_sequence) def test_array_transform_non_deterministic(): assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.range(1).selectExpr("transform(sequence(0, cast(rand(5)*10 as int) + 1), x -> x * 22) as t"), conf={'spark.rapids.sql.castFloatToIntegralTypes.enabled': True}) +@allow_non_gpu(*non_utc_allow_for_sequence) def test_array_transform_non_deterministic_second_param(): assert_gpu_and_cpu_are_equal_collect( lambda spark : debug_df(spark.range(1).selectExpr("transform(sequence(0, cast(rand(5)*10 as int) + 1), (x, i) -> x + i) as t")), From 06e214a1054c080df3c489ad56518fde04129dc4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 19 Dec 2023 07:22:10 -0700 Subject: [PATCH 2/8] Fix 332db build failure [databricks] (#10070) * trigger build Signed-off-by: Andy Grove * Fix databricks build failure --------- Signed-off-by: Andy Grove --- .../spark/sql/rapids/metrics/source/MockTaskContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala index 99406da9aba..84766789578 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/metrics/source/MockTaskContext.scala @@ -66,7 +66,7 @@ class MockTaskContext(taskAttemptId: Long, partitionId: Int) extends TaskContext override private[spark] def killTaskIfInterrupted(): Unit = {} - override private[spark] def getKillReason() = None + override def getKillReason() = None override def taskMemoryManager() = null From a613427fcd7a2fba8644fc5968ee08f27797252a Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Mon, 18 Dec 2023 20:43:14 +0800 Subject: [PATCH 3/8] Revert "Support split broadcast join condition into ast and non-ast [databricks] (#9760)" This reverts commit 7c307d489b099bd1d73f8ec4b9b953a05ca24dec. Signed-off-by: Ferdinand Xu --- .../src/main/python/join_test.py | 37 +--- .../com/nvidia/spark/rapids/AstUtil.scala | 166 +----------------- .../GpuBroadcastHashJoinExecBase.scala | 24 --- .../sql/rapids/execution/GpuHashJoin.scala | 110 ++++-------- .../execution/GpuSubPartitionHashJoin.scala | 2 +- .../execution/GpuBroadcastHashJoinExec.scala | 36 ++-- .../execution/GpuBroadcastHashJoinExec.scala | 48 ++--- .../execution/GpuBroadcastHashJoinExec.scala | 35 ++-- 8 files changed, 84 insertions(+), 374 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 9e7f5a13cb9..6660e663c92 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -19,9 +19,8 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture from conftest import is_databricks_runtime, is_emr_runtime, is_not_utc from data_gen import * -from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan, \ - datagen_overrides -from spark_session import with_cpu_session, is_before_spark_330, is_databricks113_or_later, is_databricks_runtime +from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan +from spark_session import with_cpu_session, is_before_spark_330, is_databricks_runtime pytestmark = [pytest.mark.nightly_resource_consuming_test] @@ -435,38 +434,6 @@ def do_join(spark): return broadcast(left).join(right, left.a > f.log(right.r_a), join_type) assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec') -# Allowing non Gpu for ShuffleExchangeExec is mainly for Databricks where its exchange is CPU based ('Exchange SinglePartition, EXECUTOR_BROADCAST'). -db_113_cpu_bhj_join_allow=["ShuffleExchangeExec"] if is_databricks113_or_later() else [] - - -@allow_non_gpu(*db_113_cpu_bhj_join_allow) -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', [IntegerGen(), LongGen()], ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -def test_broadcast_hash_join_on_non_ast_condition_without_fallback(data_gen, join_type): - # This is to test BHJ with a condition not fully supported by AST. With extra project nodes wrapped, join can still run on GPU other than fallback. - def do_join(spark): - left, right = create_df(spark, data_gen, 50, 25) - # AST does not support cast or logarithm yet - return left.join(right.hint("broadcast"), ((left.b == right.r_b) & (f.round(left.a).cast('integer') > f.round(f.log(right.r_a).cast('integer')))), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf = {"spark.rapids.sql.castFloatToIntegralTypes.enabled": True}) - - -@allow_non_gpu('BroadcastHashJoinExec', 'BroadcastExchangeExec') -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', [IntegerGen(), LongGen()], ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'LeftSemi', 'LeftAnti'], ids=idfn) -def test_broadcast_hash_join_on_non_ast_condition_fallback(data_gen, join_type): - # This is to test BHJ with a condition not fully supported by AST. Since AST doesn't support double, this query fallback to CPU. - # Inner join is not included since it can be supported by GPU via a post filter. - def do_join(spark): - left, right = create_df(spark, data_gen, 50, 25) - # AST does not support cast or logarithm yet and also it's not able to be split as project - # node those both sides are involved in join condition - return left.join(right.hint("broadcast"), ((left.b == right.r_b) & (left.a.cast('double') > right.r_a.cast('double'))), join_type) - assert_gpu_fallback_collect(do_join, 'BroadcastHashJoinExec') - - @ignore_order(local=True) @pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AstUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AstUtil.scala index 3f68f5d3d60..5062d8e4a99 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AstUtil.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AstUtil.scala @@ -16,15 +16,11 @@ package com.nvidia.spark.rapids -import java.io.Serializable - -import com.nvidia.spark.rapids.Arm.withResource import scala.collection.mutable import scala.collection.mutable.ListBuffer -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, ExprId, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, Expression, ExprId, NamedExpression} import org.apache.spark.sql.rapids.catalyst.expressions.{GpuEquivalentExpressions, GpuExpressionEquals} -import org.apache.spark.sql.vectorized.ColumnarBatch object AstUtil { @@ -123,164 +119,4 @@ object AstUtil { } } } - - /** - * Transforms the original join condition into extra filter/project when necessary. - * It's targeted for some cases join condition is not fully evaluated by ast. - * Based on join condition, it can be transformed into three major strategies: - * (1) [NoopJoinCondSplit]: noop when join condition can be fully evaluated with ast. - * (2) [JoinCondSplitAsPostFilter]: entire join condition is pulled out as a post filter - * after join condition. - * (3) [JoinCondSplitAsProject]: extract not supported join condition into pre-project nodes - * on each join child. One extra project node is introduced to remove intermediate attributes. - */ - abstract class JoinCondSplitStrategy(left: Seq[NamedExpression], - right: Seq[NamedExpression], buildSide: GpuBuildSide) extends Serializable { - - // Actual output of build/stream side project due to join condition split - private[this] val (buildOutputAttr, streamOutputAttr) = buildSide match { - case GpuBuildLeft => (joinLeftOutput, joinRightOutput) - case GpuBuildRight => (joinRightOutput, joinLeftOutput) - } - - // This is the left side child of join. In `split as project` strategy, it may be different - // from original left child with extracted join condition attribute. - def leftOutput(): Seq[NamedExpression] = left - - // This is the right side child of join. In `split as project` strategy, it may be different - // from original right child with extracted join condition attribute. - def rightOutput(): Seq[NamedExpression] = right - - def astCondition(): Option[Expression] - - def processBuildSideAndClose(input: ColumnarBatch): ColumnarBatch = input - - def processStreamSideAndClose(input: ColumnarBatch): ColumnarBatch = input - - def processPostJoin(iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = iter - - // This is the left side child of join. In `split as project` strategy, it may be different - // from original left child with extracted join condition attribute. - def joinLeftOutput(): Seq[Attribute] = leftOutput.map(expr => expr.toAttribute) - - // This is the right side child of join. In `split as project` strategy, it may be different - // from original right child with extracted join condition attribute. - def joinRightOutput(): Seq[Attribute] = rightOutput.map(expr => expr.toAttribute) - - // Updated build attribute list after join condition split as project node. - // It may include extra attributes from split join condition. - def buildSideOutput(): Seq[Attribute] = buildOutputAttr - - // Updated stream attribute list after join condition split as project node. - // It may include extra attributes from split join condition. - def streamedSideOutput(): Seq[Attribute] = streamOutputAttr - } - - // For the case entire join condition can be evaluated as ast. - case class NoopJoinCondSplit(condition: Option[Expression], left: Seq[NamedExpression], - right: Seq[NamedExpression], buildSide: GpuBuildSide) - extends JoinCondSplitStrategy(left, right, buildSide) { - override def astCondition(): Option[Expression] = condition - } - - // For inner joins we can apply a post-join condition for any conditions that cannot be - // evaluated directly in a mixed join that leverages a cudf AST expression. - case class JoinCondSplitAsPostFilter(expr: Option[Expression], - attributeSeq: Seq[Attribute], left: Seq[NamedExpression], - right: Seq[NamedExpression], buildSide: GpuBuildSide) - extends JoinCondSplitStrategy(left, right, buildSide) { - private[this] val postFilter = expr.map { e => - GpuBindReferences.bindGpuReferencesTiered( - Seq(e), attributeSeq, false) - } - - override def astCondition(): Option[Expression] = None - - override def processPostJoin(iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { - postFilter.map { filter => - iter.flatMap { cb => - GpuFilter.filterAndClose(cb, filter, NoopMetric, NoopMetric, NoopMetric) - } - }.getOrElse(iter) - } - } - - /** - * This is the split strategy targeting on the case where ast not supported join condition can be - * extracted and wrapped into extra project node(s). - * - * @param astCond remained join condition after extracting ast not supported parts - * @param left original expressions from join's left child. It's left project input - * attribute. - * @param leftProj extra expressions extracted from original join condition which is not - * supported by ast. It will be evaluated as a project on left side batch. - * @param right original expressions from join's right child. It's left project input - * attribute. - * @param rightProj extra expressions extracted from original join condition which is not - * supported by ast. It will be evaluated as a project on right side batch. - * @param post eliminate the extra columns introduced by join condition split - * @param buildSide indicates which side is build - */ - case class JoinCondSplitAsProject( - astCond: Option[Expression], - left: Seq[NamedExpression], leftProj: Seq[NamedExpression], - right: Seq[NamedExpression], rightProj: Seq[NamedExpression], - post: Seq[NamedExpression], buildSide: GpuBuildSide - ) extends JoinCondSplitStrategy(left ++ leftProj, right ++ rightProj, buildSide) { - private[this] val leftInput = left.map(_.toAttribute) - private[this] val rightInput = right.map(_.toAttribute) - - // Used to build build/stream side project - private[this] val (buildOutput, streamOutput, buildInput, streamInput) = buildSide match { - case GpuBuildLeft => - (leftOutput, rightOutput, leftInput, rightInput) - case GpuBuildRight => - (rightOutput, leftOutput, rightInput, leftInput) - } - - private[this] val buildProj = if (!buildOutput.isEmpty) { - Some(GpuBindReferences.bindGpuReferencesTiered(buildOutput, buildInput, false)) - } else None - - private[this] val streamProj = if (!streamOutput.isEmpty) { - Some(GpuBindReferences.bindGpuReferencesTiered(streamOutput, streamInput, false)) - } else None - - // Remove the intermediate attributes from left and right side project nodes. Output attributes - // need to be updated based on join type. And its attributes covers both original plan and - // extra project node. - private[this] val postProj = if (!post.isEmpty) { - Some( - GpuBindReferences.bindGpuReferencesTiered( - post, (leftOutput ++ rightOutput).map(_.toAttribute), false)) - } else None - - override def astCondition(): Option[Expression] = astCond - - override def processBuildSideAndClose(input: ColumnarBatch): ColumnarBatch = { - buildProj.map { pj => - withResource(input) { cb => - pj.project(cb) - } - }.getOrElse(input) - } - - override def processStreamSideAndClose(input: ColumnarBatch): ColumnarBatch = { - streamProj.map { pj => - withResource(input) { cb => - pj.project(cb) - } - }.getOrElse(input) - } - - override def processPostJoin(iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { - postProj.map { proj => - iter.map { cb => - withResource(cb) { b => - proj.project(b) - } - } - }.getOrElse(iter) - } - } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala index 7531223fba6..4982c6e3c9c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.rapids.execution import ai.rapids.cudf.{NvtxColor, NvtxRange} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.AstUtil.JoinCondSplitStrategy import com.nvidia.spark.rapids.shims.{GpuBroadcastJoinMeta, ShimBinaryExecNode} import org.apache.spark.TaskContext @@ -56,28 +55,6 @@ abstract class GpuBroadcastHashJoinMetaBase( override val childExprs: Seq[BaseExprMeta[_]] = leftKeys ++ rightKeys ++ conditionMeta - private var taggedForAstCheck = false - - // Avoid checking multiple times - private var isAstCond = false - - /** - * Check whether condition can be ast-able. It includes two cases: 1) all join conditions are - * ast-able; 2) join conditions are ast-able after split and push down to child plans. - */ - def canJoinCondAstAble(): Boolean = { - if (!taggedForAstCheck) { - val Seq(leftPlan, rightPlan) = childPlans - isAstCond = conditionMeta match { - case Some(e) => AstUtil.canExtractNonAstConditionIfNeed( - e, leftPlan.outputAttributes.map(_.exprId), rightPlan.outputAttributes.map(_.exprId)) - case None => true - } - taggedForAstCheck = true - } - isAstCond - } - override def tagPlanForGpu(): Unit = { GpuHashJoin.tagJoin(this, join.joinType, buildSide, join.leftKeys, join.rightKeys, conditionMeta) @@ -126,7 +103,6 @@ abstract class GpuBroadcastHashJoinExecBase( joinType: JoinType, buildSide: GpuBuildSide, override val condition: Option[Expression], - override val joinCondSplitStrategy: JoinCondSplitStrategy, left: SparkPlan, right: SparkPlan) extends ShimBinaryExecNode with GpuHashJoin { import GpuMetric._ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index f86f75104a3..cbaa1cbe47c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -19,7 +19,6 @@ import ai.rapids.cudf.{ColumnView, DType, GatherMap, GroupByAggregation, NullEqu import ai.rapids.cudf.ast.CompiledExpression import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.AstUtil.{JoinCondSplitStrategy, NoopJoinCondSplit} import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{withRestoreOnRetry, withRetryNoSplit} import com.nvidia.spark.rapids.jni.GpuOOM @@ -117,11 +116,9 @@ object GpuHashJoin { joinType match { case _: InnerLike => case RightOuter | LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => - // First to check whether can be split if not ast-able. If false, then check requireAst to - // send not-work-on-GPU reason if not replace-able. - conditionMeta.foreach(cond => if (!canJoinCondAstAble(meta)) meta.requireAstForGpuOn(cond)) + conditionMeta.foreach(meta.requireAstForGpuOn) case FullOuter => - conditionMeta.foreach(cond => if (!canJoinCondAstAble(meta)) meta.requireAstForGpuOn(cond)) + conditionMeta.foreach(meta.requireAstForGpuOn) // FullOuter join cannot support with struct keys as two issues below // * https://github.com/NVIDIA/spark-rapids/issues/2126 // * https://github.com/rapidsai/cudf/issues/7947 @@ -141,15 +138,6 @@ object GpuHashJoin { } } - // Check whether the entire tree is ast-able or being able to split non-ast-able conditions - // into child nodes. Now only support broad hash join. - private[this] def canJoinCondAstAble(meta: SparkPlanMeta[_]): Boolean = { - meta match { - case meta: GpuBroadcastHashJoinMeta => meta.canJoinCondAstAble - case _ => false - } - } - /** Determine if this type of join supports using the right side of the join as the build side. */ def canBuildRight(joinType: JoinType): Boolean = joinType match { case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | FullOuter | _: ExistenceJoin => true @@ -266,25 +254,6 @@ object GpuHashJoin { keys.forall(_.dataType.isInstanceOf[IntegralType]) && keys.map(_.dataType.defaultSize).sum <= 8 } - - def output(joinType: JoinType, left: Seq[Attribute], right: Seq[Attribute]): Seq[Attribute] = { - joinType match { - case _: InnerLike => - left ++ right - case LeftOuter => - left ++ right.map(_.withNullability(true)) - case RightOuter => - left.map(_.withNullability(true)) ++ right - case j: ExistenceJoin => - left :+ j.exists - case LeftExistence(_) => - left - case FullOuter => - left.map(_.withNullability(true)) ++ right.map(_.withNullability(true)) - case x => - throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") - } - } } abstract class BaseHashJoinIterator( @@ -897,8 +866,6 @@ trait GpuHashJoin extends GpuExec { def leftKeys: Seq[Expression] def rightKeys: Seq[Expression] def buildSide: GpuBuildSide - def joinCondSplitStrategy: JoinCondSplitStrategy = NoopJoinCondSplit( - condition, left.output, right.output, buildSide) protected lazy val (buildPlan, streamedPlan) = buildSide match { case GpuBuildLeft => (left, right) @@ -918,7 +885,22 @@ trait GpuHashJoin extends GpuExec { } override def output: Seq[Attribute] = { - GpuHashJoin.output(joinType, left.output, right.output) + joinType match { + case _: InnerLike => + left.output ++ right.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case j: ExistenceJoin => + left.output :+ j.exists + case LeftExistence(_) => + left.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case x => + throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") + } } // If we have a single batch streamed in then we will produce a single batch of output @@ -971,10 +953,8 @@ trait GpuHashJoin extends GpuExec { GpuHashJoin.anyNullableStructChild(buildKeys) protected lazy val (boundBuildKeys, boundStreamKeys) = { - val lkeys = - GpuBindReferences.bindGpuReferences(leftKeys, joinCondSplitStrategy.joinLeftOutput) - val rkeys = - GpuBindReferences.bindGpuReferences(rightKeys, joinCondSplitStrategy.joinRightOutput) + val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) + val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) buildSide match { case GpuBuildLeft => (lkeys, rkeys) @@ -983,25 +963,14 @@ trait GpuHashJoin extends GpuExec { } protected lazy val (numFirstConditionTableColumns, boundCondition) = { - val joinLeft = joinType match { - case RightOuter => - if(buildSide == GpuBuildRight) { - joinCondSplitStrategy.buildSideOutput - } else { - joinCondSplitStrategy.streamedSideOutput - } - case _ => - if (buildSide == GpuBuildRight) { - joinCondSplitStrategy.streamedSideOutput - } else { - joinCondSplitStrategy.buildSideOutput - } + val (joinLeft, joinRight) = joinType match { + case RightOuter => (right, left) + case _ => (left, right) } val boundCondition = condition.map { c => - GpuBindReferences.bindGpuReference(c, - joinCondSplitStrategy.streamedSideOutput ++ joinCondSplitStrategy.buildSideOutput) + GpuBindReferences.bindGpuReference(c, joinLeft.output ++ joinRight.output) } - (joinLeft.size, boundCondition) + (joinLeft.output.size, boundCondition) } def doJoin( @@ -1025,14 +994,13 @@ trait GpuHashJoin extends GpuExec { builtBatch } - val spillableBuiltBatch = withResource(joinCondSplitStrategy - .processBuildSideAndClose(nullFiltered)) { + val spillableBuiltBatch = withResource(nullFiltered) { LazySpillableColumnarBatch(_, "built") } val lazyStream = stream.map { cb => - withResource(joinCondSplitStrategy.processStreamSideAndClose(cb)) { updatedBatch => - LazySpillableColumnarBatch(updatedBatch, "stream_batch") + withResource(cb) { cb => + LazySpillableColumnarBatch(cb, "stream_batch") } } @@ -1051,29 +1019,25 @@ trait GpuHashJoin extends GpuExec { opTime, joinTime) case FullOuter => - new HashFullJoinIterator( - spillableBuiltBatch, boundBuildKeys, lazyStream, - boundStreamKeys, joinCondSplitStrategy.streamedSideOutput, boundCondition, - numFirstConditionTableColumns, targetSize, buildSide, compareNullsEqual, opTime, - joinTime) + new HashFullJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream, + boundStreamKeys, streamedPlan.output, boundCondition, numFirstConditionTableColumns, + targetSize, buildSide, compareNullsEqual, opTime, joinTime) case _ => if (boundCondition.isDefined) { // ConditionalHashJoinIterator will close the compiled condition val compiledCondition = boundCondition.get.convertToAst(numFirstConditionTableColumns).compile() - new ConditionalHashJoinIterator( - spillableBuiltBatch, boundBuildKeys, lazyStream, - boundStreamKeys, joinCondSplitStrategy.streamedSideOutput, compiledCondition, + new ConditionalHashJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream, + boundStreamKeys, streamedPlan.output, compiledCondition, targetSize, joinType, buildSide, compareNullsEqual, opTime, joinTime) } else { - new HashJoinIterator( - spillableBuiltBatch, boundBuildKeys, lazyStream, boundStreamKeys, - joinCondSplitStrategy.streamedSideOutput, targetSize, joinType, buildSide, - compareNullsEqual, opTime, joinTime) + new HashJoinIterator(spillableBuiltBatch, boundBuildKeys, lazyStream, boundStreamKeys, + streamedPlan.output, targetSize, joinType, buildSide, compareNullsEqual, + opTime, joinTime) } } - joinCondSplitStrategy.processPostJoin(joinIterator).map { cb => + joinIterator.map { cb => joinOutputRows += cb.numRows() numOutputRows += cb.numRows() numOutputBatches += 1 diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala index ccaf92f89dc..b4582b3e0d5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala @@ -593,7 +593,7 @@ trait GpuSubPartitionHashJoin extends Logging { self: GpuHashJoin => } } // Leverage the original join iterators - val joinIter = doJoin(buildCb, streamIter, targetSize, + val joinIter = doJoin(buildCb, streamIter, targetSize, numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime) Some(joinIter) } diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala index cd640057a58..0b1be70234b 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala @@ -35,7 +35,6 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.AstUtil.{JoinCondSplitAsPostFilter, JoinCondSplitAsProject, JoinCondSplitStrategy, NoopJoinCondSplit} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.JoinType @@ -49,6 +48,12 @@ class GpuBroadcastHashJoinMeta( rule: DataFromReplacementRule) extends GpuBroadcastHashJoinMetaBase(join, conf, parent, rule) { override def convertToGpu(): GpuExec = { + val condition = conditionMeta.map(_.convertToGpu()) + val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) { + (condition, None) + } else { + (None, condition) + } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSideMeta = buildSide match { @@ -56,32 +61,16 @@ class GpuBroadcastHashJoinMeta( case GpuBuildRight => right } verifyBuildSideWasReplaced(buildSideMeta) - // First to check whether we can extract some non-supported AST conditions. If not, will do a - // post-join filter right after hash join node. Otherwise, do split as project. - val nonAstJoinCond = if (!canJoinCondAstAble()) { - JoinCondSplitAsPostFilter( - conditionMeta.map(_.convertToGpu()), GpuHashJoin.output( - join.joinType, left.output, right.output), left.output, right.output, buildSide) - } else { - val (remain, leftExpr, rightExpr) = AstUtil.extractNonAstFromJoinCond( - conditionMeta, left.output, right.output, true) - if (leftExpr.isEmpty && rightExpr.isEmpty) { - NoopJoinCondSplit(remain, left.output, right.output, buildSide) - } else { - JoinCondSplitAsProject( - remain, left.output, leftExpr, right.output, rightExpr, - GpuHashJoin.output(join.joinType, left.output, right.output), buildSide) - } - } - - GpuBroadcastHashJoinExec( + val joinExec = GpuBroadcastHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, buildSide, - nonAstJoinCond.astCondition(), - nonAstJoinCond, + joinCondition, left, right) + // For inner joins we can apply a post-join condition for any conditions that cannot be + // evaluated directly in a mixed join that leverages a cudf AST expression + filterCondition.map(c => GpuFilterExec(c, joinExec)()).getOrElse(joinExec) } } @@ -91,7 +80,6 @@ case class GpuBroadcastHashJoinExec( joinType: JoinType, buildSide: GpuBuildSide, override val condition: Option[Expression], - override val joinCondSplitStrategy: JoinCondSplitStrategy, left: SparkPlan, right: SparkPlan) extends GpuBroadcastHashJoinExecBase( - leftKeys, rightKeys, joinType, buildSide, condition, joinCondSplitStrategy, left, right) \ No newline at end of file + leftKeys, rightKeys, joinType, buildSide, condition, left, right) \ No newline at end of file diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala index 5db2c11c3b1..ca4b0dfa31a 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala @@ -24,7 +24,6 @@ package org.apache.spark.sql.rapids.execution import ai.rapids.cudf.{NvtxColor, NvtxRange} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.AstUtil.{JoinCondSplitAsPostFilter, JoinCondSplitAsProject, JoinCondSplitStrategy, NoopJoinCondSplit} import org.apache.spark.TaskContext import org.apache.spark.rapids.shims.GpuShuffleExchangeExec @@ -46,6 +45,12 @@ class GpuBroadcastHashJoinMeta( rule: DataFromReplacementRule) extends GpuBroadcastHashJoinMetaBase(join, conf, parent, rule) { override def convertToGpu(): GpuExec = { + val condition = conditionMeta.map(_.convertToGpu()) + val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) { + (condition, None) + } else { + (None, condition) + } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSideMeta = buildSide match { @@ -53,32 +58,18 @@ class GpuBroadcastHashJoinMeta( case GpuBuildRight => right } verifyBuildSideWasReplaced(buildSideMeta) - // First to check whether we can extract some non-supported AST conditions. If not, will do a - // post-join filter right after hash join node. Otherwise, do split as project. - val nonAstJoinCond = if (!canJoinCondAstAble()) { - JoinCondSplitAsPostFilter( - conditionMeta.map(_.convertToGpu()), GpuHashJoin.output( - join.joinType, left.output, right.output), left.output, right.output, buildSide) - } else { - val (remain, leftExpr, rightExpr) = AstUtil.extractNonAstFromJoinCond( - conditionMeta, left.output, right.output, true) - if (leftExpr.isEmpty && rightExpr.isEmpty) { - NoopJoinCondSplit(remain, left.output, right.output, buildSide) - } else { - JoinCondSplitAsProject( - remain, left.output, leftExpr, right.output, rightExpr, - GpuHashJoin.output(join.joinType, left.output, right.output), buildSide) - } - } - - GpuBroadcastHashJoinExec( + val joinExec = GpuBroadcastHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, buildSide, - nonAstJoinCond.astCondition(), - nonAstJoinCond, - left, right, join.isExecutorBroadcast) + joinCondition, + left, + right, + join.isExecutorBroadcast) + // For inner joins we can apply a post-join condition for any conditions that cannot be + // evaluated directly in a mixed join that leverages a cudf AST expression + filterCondition.map(c => GpuFilterExec(c, joinExec)()).getOrElse(joinExec) } } @@ -88,12 +79,11 @@ case class GpuBroadcastHashJoinExec( joinType: JoinType, buildSide: GpuBuildSide, override val condition: Option[Expression], - override val joinCondSplitStrategy: JoinCondSplitStrategy, left: SparkPlan, - right: SparkPlan, + right: SparkPlan, executorBroadcast: Boolean) - extends GpuBroadcastHashJoinExecBase(leftKeys, rightKeys, joinType, buildSide, - condition, joinCondSplitStrategy, left, right) { + extends GpuBroadcastHashJoinExecBase( + leftKeys, rightKeys, joinType, buildSide, condition, left, right) { import GpuMetric._ override lazy val additionalMetrics: Map[String, GpuMetric] = Map( @@ -157,8 +147,8 @@ case class GpuBroadcastHashJoinExec( GpuSemaphore.acquireIfNecessary(TaskContext.get()) } } - val buildBatch = GpuExecutorBroadcastHelper.getExecutorBroadcastBatch( - buildRelation, buildSchema, buildOutput, metricsMap, targetSize) + val buildBatch = GpuExecutorBroadcastHelper.getExecutorBroadcastBatch(buildRelation, + buildSchema, buildOutput, metricsMap, targetSize) (buildBatch, bufferedStreamIter) } } diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala index e2a8bb51ba5..4985d791829 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExec.scala @@ -22,7 +22,6 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.AstUtil.{JoinCondSplitAsPostFilter, JoinCondSplitAsProject, JoinCondSplitStrategy, NoopJoinCondSplit} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.JoinType @@ -36,6 +35,12 @@ class GpuBroadcastHashJoinMeta( rule: DataFromReplacementRule) extends GpuBroadcastHashJoinMetaBase(join, conf, parent, rule) { override def convertToGpu(): GpuExec = { + val condition = conditionMeta.map(_.convertToGpu()) + val (joinCondition, filterCondition) = if (conditionMeta.forall(_.canThisBeAst)) { + (condition, None) + } else { + (None, condition) + } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) // The broadcast part of this must be a BroadcastExchangeExec val buildSideMeta = buildSide match { @@ -43,31 +48,16 @@ class GpuBroadcastHashJoinMeta( case GpuBuildRight => right } verifyBuildSideWasReplaced(buildSideMeta) - // First to check whether we can extract some non-supported AST conditions. If not, will do a - // post-join filter right after hash join node. Otherwise, do split as project. - val nonAstJoinCond = if (!canJoinCondAstAble()) { - JoinCondSplitAsPostFilter(conditionMeta.map(_.convertToGpu()), GpuHashJoin.output( - join.joinType, left.output, right.output), left.output, right.output, buildSide) - } else { - val (remain, leftExpr, rightExpr) = AstUtil.extractNonAstFromJoinCond( - conditionMeta, left.output, right.output, true) - if(leftExpr.isEmpty && rightExpr.isEmpty) { - NoopJoinCondSplit(remain, left.output, right.output, buildSide) - } else { - JoinCondSplitAsProject( - remain, left.output, leftExpr, right.output, rightExpr, - GpuHashJoin.output(join.joinType, left.output, right.output), buildSide) - } - } - - GpuBroadcastHashJoinExec( + val joinExec = GpuBroadcastHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, buildSide, - nonAstJoinCond.astCondition(), - nonAstJoinCond, + joinCondition, left, right) + // For inner joins we can apply a post-join condition for any conditions that cannot be + // evaluated directly in a mixed join that leverages a cudf AST expression + filterCondition.map(c => GpuFilterExec(c, joinExec)()).getOrElse(joinExec) } } @@ -77,7 +67,6 @@ case class GpuBroadcastHashJoinExec( joinType: JoinType, buildSide: GpuBuildSide, override val condition: Option[Expression], - override val joinCondSplitStrategy: JoinCondSplitStrategy, left: SparkPlan, right: SparkPlan) extends GpuBroadcastHashJoinExecBase( - leftKeys, rightKeys, joinType, buildSide, condition, joinCondSplitStrategy, left, right) + leftKeys, rightKeys, joinType, buildSide, condition, left, right) From 2edf82dee17757f54330bd0628a562acbb7b6a3b Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Tue, 19 Dec 2023 14:24:34 -0600 Subject: [PATCH 4/8] Improve host memory spill interfaces (#10065) Signed-off-by: Jim Brennan --- .../InternalRowToColumnarBatchIterator.java | 107 ++++++++++-------- .../nvidia/spark/rapids/RapidsBuffer.scala | 14 ++- .../spark/rapids/RapidsBufferCatalog.scala | 38 ++++++- .../spark/rapids/RapidsBufferStore.scala | 52 +++++++-- .../nvidia/spark/rapids/RapidsDiskStore.scala | 55 ++++----- .../spark/rapids/RapidsHostMemoryStore.scala | 86 ++++++++------ .../spark/rapids/SpillableColumnarBatch.scala | 36 +----- .../rapids/RapidsBufferCatalogSuite.scala | 3 +- .../rapids/RapidsHostMemoryStoreSuite.scala | 59 ++++++---- .../rapids/SpillableColumnarBatchSuite.scala | 3 +- 10 files changed, 266 insertions(+), 187 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java index a1f878cb078..7d7046b2f24 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/InternalRowToColumnarBatchIterator.java @@ -147,55 +147,56 @@ public ColumnarBatch next() { // Update our estimate for number of rows with the final size used to allocate the buffers. numRowsEstimate = (int) bufsAndNumRows._2.targetSize(); long dataLength = calcDataLengthEstimate(numRowsEstimate); - try ( - SpillableHostBuffer sdb = bufsAndNumRows._1[0]; - SpillableHostBuffer sob = bufsAndNumRows._1[1]; + int used[]; + try (SpillableHostBuffer spillableDataBuffer = bufsAndNumRows._1[0]; + SpillableHostBuffer spillableOffsetsBuffer = bufsAndNumRows._1[1]; ) { - // Fill in buffer under write lock for host buffers - batchAndRange = sdb.withHostBufferWriteLock( (dataBuffer) -> { - return sob.withHostBufferWriteLock( (offsetsBuffer) -> { - int[] used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); - int dataOffset = used[0]; - int currentRow = used[1]; - // We don't want to loop forever trying to copy nothing - assert (currentRow > 0); - if (numInputRows != null) { - numInputRows.add(currentRow); - } - if (numOutputRows != null) { - numOutputRows.add(currentRow); - } - if (numOutputBatches != null) { - numOutputBatches.add(1); - } - // Now that we have filled the buffers with the data, we need to turn them into a - // HostColumnVector and copy them to the device so the GPU can turn it into a Table. - // To do this we first need to make a HostColumnCoreVector for the data, and then - // put that into a HostColumnVector as its child. This the basics of building up - // a column of lists of bytes in CUDF but it is typically hidden behind the higer level - // APIs. - dataBuffer.incRefCount(); - offsetsBuffer.incRefCount(); - try (HostColumnVectorCore dataCv = - new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L), - dataBuffer, null, null, new ArrayList<>()); - HostColumnVector hostColumn = new HostColumnVector(DType.LIST, - currentRow, Optional.of(0L), null, null, - offsetsBuffer, Collections.singletonList(dataCv))) { + HostMemoryBuffer[] hBufs = + getHostBuffersWithRetry(spillableDataBuffer, spillableOffsetsBuffer); + try(HostMemoryBuffer dataBuffer = hBufs[0]; + HostMemoryBuffer offsetsBuffer = hBufs[1]; + ) { + used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate); + int dataOffset = used[0]; + int currentRow = used[1]; + // We don't want to loop forever trying to copy nothing + assert (currentRow > 0); + if (numInputRows != null) { + numInputRows.add(currentRow); + } + if (numOutputRows != null) { + numOutputRows.add(currentRow); + } + if (numOutputBatches != null) { + numOutputBatches.add(1); + } + // Now that we have filled the buffers with the data, we need to turn them into a + // HostColumnVector and copy them to the device so the GPU can turn it into a Table. + // To do this we first need to make a HostColumnCoreVector for the data, and then + // put that into a HostColumnVector as its child. This the basics of building up + // a column of lists of bytes in CUDF but it is typically hidden behind the higer level + // APIs. + dataBuffer.incRefCount(); + offsetsBuffer.incRefCount(); + try (HostColumnVectorCore dataCv = + new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L), + dataBuffer, null, null, new ArrayList<>()); + HostColumnVector hostColumn = new HostColumnVector(DType.LIST, + currentRow, Optional.of(0L), null, null, + offsetsBuffer, Collections.singletonList(dataCv))) { - long ct = System.nanoTime() - collectStart; - streamTime.add(ct); + long ct = System.nanoTime() - collectStart; + streamTime.add(ct); - // Grab the semaphore because we are about to put data onto the GPU. - GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get()); - NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN, - Option.apply(opTime)); - ColumnVector devColumn = - RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice); - return Tuple2.apply(makeSpillableBatch(devColumn), range); - } - }); - }); + // Grab the semaphore because we are about to put data onto the GPU. + GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get()); + NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN, + Option.apply(opTime)); + ColumnVector devColumn = + RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice); + batchAndRange = Tuple2.apply(makeSpillableBatch(devColumn), range); + } + } } try (NvtxRange ignored = batchAndRange._2; Table tab = @@ -208,6 +209,20 @@ public ColumnarBatch next() { } } + private HostMemoryBuffer[] getHostBuffersWithRetry( + SpillableHostBuffer spillableDataBuffer, SpillableHostBuffer spillableOffsetsBuffer) { + return RmmRapidsRetryIterator.withRetryNoSplit( () -> { + try (HostMemoryBuffer dataBuffer = spillableDataBuffer.getHostBuffer(); + HostMemoryBuffer offsetsBuffer = spillableOffsetsBuffer.getHostBuffer(); + ) { + // Increment these to keep them. + dataBuffer.incRefCount(); + offsetsBuffer.incRefCount(); + return new HostMemoryBuffer[] { dataBuffer, offsetsBuffer }; + } + }); + } + private Tuple2 allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) { HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null }; diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index 657cbb33dd0..a332755745f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -21,7 +21,7 @@ import java.nio.channels.WritableByteChannel import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer, Table} +import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.StorageTier.StorageTier @@ -320,6 +320,15 @@ trait RapidsBuffer extends AutoCloseable { */ def getDeviceMemoryBuffer: DeviceMemoryBuffer + /** + * Get the host memory buffer from the underlying storage. If the buffer currently resides + * outside of host memory, a new HostMemoryBuffer is created with the data copied over. + * The caller must have successfully acquired the buffer beforehand. + * @see [[addReference]] + * @note It is the responsibility of the caller to close the buffer. + */ + def getHostMemoryBuffer: HostMemoryBuffer + /** * Try to add a reference to this buffer to acquire it. * @note The close method must be called for every successfully obtained reference. @@ -425,6 +434,9 @@ sealed class DegenerateRapidsBuffer( override def getDeviceMemoryBuffer: DeviceMemoryBuffer = throw new UnsupportedOperationException("degenerate buffer has no device memory buffer") + override def getHostMemoryBuffer: HostMemoryBuffer = + throw new UnsupportedOperationException("degenerate buffer has no host memory buffer") + override def addReference(): Boolean = true override def getSpillPriority: Long = Long.MaxValue diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 61a636c1708..f98b52ae022 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -613,7 +613,7 @@ class RapidsBufferCatalog( } } - def updateTiers(bufferSpill: BufferSpill): Long = bufferSpill match { + def updateTiers(bufferSpill: SpillAction): Long = bufferSpill match { case BufferSpill(spilledBuffer, maybeNewBuffer) => logDebug(s"Spilled ${spilledBuffer.id} from tier ${spilledBuffer.storageTier}. " + s"Removing. Registering ${maybeNewBuffer.map(_.id).getOrElse ("None")} " + @@ -621,6 +621,14 @@ class RapidsBufferCatalog( maybeNewBuffer.foreach(registerNewBuffer) removeBufferTier(spilledBuffer.id, spilledBuffer.storageTier) spilledBuffer.memoryUsedBytes + + case BufferUnspill(unspilledBuffer, maybeNewBuffer) => + logDebug(s"Unspilled ${unspilledBuffer.id} from tier ${unspilledBuffer.storageTier}. " + + s"Removing. Registering ${maybeNewBuffer.map(_.id).getOrElse ("None")} " + + s"${maybeNewBuffer}") + maybeNewBuffer.foreach(registerNewBuffer) + removeBufferTier(unspilledBuffer.id, unspilledBuffer.storageTier) + unspilledBuffer.memoryUsedBytes } /** @@ -647,6 +655,34 @@ class RapidsBufferCatalog( } } + /** + * Copies `buffer` to the `hostStorage` store, registering a new `RapidsBuffer` in + * the process + * + * @param buffer - buffer to copy + * @param stream - Cuda.Stream to synchronize on + * @return - The `RapidsBuffer` instance that was added to the host store. + */ + def unspillBufferToHostStore( + buffer: RapidsBuffer, + stream: Cuda.Stream): RapidsBuffer = synchronized { + // try to acquire the buffer, if it's already in the store + // do not create a new one, else add a reference + acquireBuffer(buffer.id, StorageTier.HOST) match { + case Some(existingBuffer) => existingBuffer + case None => + val maybeNewBuffer = hostStorage.copyBuffer(buffer, this, stream) + maybeNewBuffer.map { newBuffer => + logDebug(s"got new RapidsHostMemoryStore buffer ${newBuffer.id}") + newBuffer.addReference() // add a reference since we are about to use it + updateTiers(BufferUnspill(buffer, Some(newBuffer))) + buffer.safeFree() + newBuffer + }.get // the host store has to return a buffer here for now, or throw OOM + } + } + + /** * Remove a buffer ID from the catalog at the specified storage tier. * @note public for testing diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index 511686f8557..98023259d82 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import ai.rapids.cudf.{BaseDeviceMemoryBuffer, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.StorageTier.{DEVICE, StorageTier} +import com.nvidia.spark.rapids.StorageTier.{DEVICE, HOST, StorageTier} import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.internal.Logging @@ -32,13 +32,22 @@ import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch /** - * A helper case class that contains the buffer we spilled from our current tier - * and likely a new buffer created in a spill store tier, but it can be set to None. - * If the buffer already exists in the target spill store, `newBuffer` will be None. - * @param spilledBuffer a `RapidsBuffer` we spilled from this store - * @param newBuffer an optional `RapidsBuffer` in the target spill store. + * Helper case classes that contain the buffer we spilled or unspilled from our current tier + * and likely a new buffer created in a target store tier, but it can be set to None. + * If the buffer already exists in the target store, `newBuffer` will be None. + * @param spillBuffer a `RapidsBuffer` we spilled or unspilled from this store + * @param newBuffer an optional `RapidsBuffer` in the target store. */ -case class BufferSpill(spilledBuffer: RapidsBuffer, newBuffer: Option[RapidsBuffer]) +trait SpillAction { + val spillBuffer: RapidsBuffer + val newBuffer: Option[RapidsBuffer] +} + +case class BufferSpill(spillBuffer: RapidsBuffer, newBuffer: Option[RapidsBuffer]) + extends SpillAction + +case class BufferUnspill(spillBuffer: RapidsBuffer, newBuffer: Option[RapidsBuffer]) + extends SpillAction /** * Base class for all buffer store types. @@ -307,7 +316,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) // as it has already spilled. BufferSpill(nextSpillableBuffer, None) } - totalSpilled += bufferSpill.spilledBuffer.memoryUsedBytes + totalSpilled += bufferSpill.spillBuffer.memoryUsedBytes bufferSpills.append(bufferSpill) catalog.updateTiers(bufferSpill) } @@ -333,7 +342,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) // the buffer via events. // https://github.com/NVIDIA/spark-rapids/issues/8610 Cuda.deviceSynchronize() - bufferSpills.foreach(_.spilledBuffer.safeFree()) + bufferSpills.foreach(_.spillBuffer.safeFree()) } } } @@ -516,6 +525,31 @@ abstract class RapidsBufferStore(val tier: StorageTier) } } + override def getHostMemoryBuffer: HostMemoryBuffer = { + (0 until MAX_UNSPILL_ATTEMPTS).foreach { _ => + catalog.acquireBuffer(id, HOST) match { + case Some(buffer) => + withResource(buffer) { _ => + return buffer.getHostMemoryBuffer + } + case _ => + try { + logDebug(s"Unspilling $this $id to $HOST") + val newBuffer = catalog.unspillBufferToHostStore( + this, + Cuda.DEFAULT_STREAM) + withResource(newBuffer) { _ => + return newBuffer.getHostMemoryBuffer + } + } catch { + case _: DuplicateBufferException => + logDebug(s"Lost host buffer registration race for buffer $id, retrying...") + } + } + } + throw new IllegalStateException(s"Unable to get host memory buffer for ID: $id") + } + /** * close() is called by client code to decrease the ref count of this RapidsBufferBase. * In the off chance that by the time close is invoked, the buffer was freed (not valid) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index 3a4c8cf1797..5003ba46184 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -140,7 +140,6 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) meta: TableMeta, spillPriority: Long) extends RapidsBufferBase(id, meta, spillPriority) { - private[this] var hostBuffer: Option[HostMemoryBuffer] = None // FIXME: Need to be clean up. Tracked in https://github.com/NVIDIA/spark-rapids/issues/9496 override val memoryUsedBytes: Long = uncompressedSize @@ -148,54 +147,40 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) override val storageTier: StorageTier = StorageTier.DISK override def getMemoryBuffer: MemoryBuffer = synchronized { - if (hostBuffer.isEmpty) { - require(onDiskSizeInBytes > 0, - s"$this attempted an invalid 0-byte mmap of a file") - val path = id.getDiskPath(diskBlockManager) - val serializerManager = diskBlockManager.getSerializerManager() - val memBuffer = if (serializerManager.isRapidsSpill(id)) { - // Only go through serializerManager's stream wrapper for spill case - closeOnExcept(HostMemoryBuffer.allocate(uncompressedSize)) { decompressed => - GpuTaskMetrics.get.readSpillFromDiskTime { - withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c => - c.position(fileOffset) - withResource(Channels.newInputStream(c)) { compressed => - withResource(serializerManager.wrapStream(id, compressed)) { in => - withResource(new HostMemoryOutputStream(decompressed)) { out => - IOUtils.copy(in, out) - } - decompressed + require(onDiskSizeInBytes > 0, + s"$this attempted an invalid 0-byte mmap of a file") + val path = id.getDiskPath(diskBlockManager) + val serializerManager = diskBlockManager.getSerializerManager() + val memBuffer = if (serializerManager.isRapidsSpill(id)) { + // Only go through serializerManager's stream wrapper for spill case + closeOnExcept(HostAlloc.alloc(uncompressedSize)) { + decompressed => GpuTaskMetrics.get.readSpillFromDiskTime { + withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c => + c.position(fileOffset) + withResource(Channels.newInputStream(c)) { compressed => + withResource(serializerManager.wrapStream(id, compressed)) { in => + withResource(new HostMemoryOutputStream(decompressed)) { out => + IOUtils.copy(in, out) } + decompressed } } } } - } else { - // Reserved mmap read fashion for UCX shuffle path. Also it's skipping encryption and - // compression. - HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, fileOffset, onDiskSizeInBytes) } - hostBuffer = Some(memBuffer) + } else { + // Reserved mmap read fashion for UCX shuffle path. Also it's skipping encryption and + // compression. + HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, fileOffset, onDiskSizeInBytes) } - hostBuffer.foreach(_.incRefCount()) - hostBuffer.get + memBuffer } override def close(): Unit = synchronized { - if (refcount == 1) { - // free the memory mapping since this is the last active reader - hostBuffer.foreach { b => - logDebug(s"closing mmap buffer $b") - b.close() - } - hostBuffer = None - } super.close() } override protected def releaseResources(): Unit = { - require(hostBuffer.isEmpty, - "Releasing a disk buffer with non-empty host buffer") // Buffers that share paths must be cleaned up elsewhere if (id.canShareDiskPaths) { sharedBufferFiles.remove(id) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index cdcdfea9715..32fe6229674 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -119,45 +119,57 @@ class RapidsHostMemoryStore( s"in the host store, skipping tier.") None } else { - withResource(other.getCopyIterator) { otherBufferIterator => - val isChunked = otherBufferIterator.isChunked - val totalCopySize = otherBufferIterator.getTotalCopySize - closeOnExcept(HostAlloc.tryAlloc(totalCopySize)) { hb => - hb.map { hostBuffer => - val spillNs = GpuTaskMetrics.get.spillToHostTime { - var hostOffset = 0L - val start = System.nanoTime() - while (otherBufferIterator.hasNext) { - val otherBuffer = otherBufferIterator.next() - withResource(otherBuffer) { _ => - otherBuffer match { - case devBuffer: DeviceMemoryBuffer => - hostBuffer.copyFromMemoryBufferAsync( - hostOffset, devBuffer, 0, otherBuffer.getLength, stream) - hostOffset += otherBuffer.getLength - case _ => - throw new IllegalStateException("copying from buffer without device memory") + // If the other is from the local disk store, we are unspilling to host memory. + if (other.storageTier == StorageTier.DISK) { + logDebug(s"copying RapidsDiskStore buffer ${other.id} to a HostMemoryBuffer") + val hostBuffer = other.getMemoryBuffer.asInstanceOf[HostMemoryBuffer] + Some(new RapidsHostMemoryBuffer( + other.id, + hostBuffer.getLength(), + other.meta, + applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET), + hostBuffer)) + } else { + withResource(other.getCopyIterator) { otherBufferIterator => + val isChunked = otherBufferIterator.isChunked + val totalCopySize = otherBufferIterator.getTotalCopySize + closeOnExcept(HostAlloc.tryAlloc(totalCopySize)) { hb => + hb.map { hostBuffer => + val spillNs = GpuTaskMetrics.get.spillToHostTime { + var hostOffset = 0L + val start = System.nanoTime() + while (otherBufferIterator.hasNext) { + val otherBuffer = otherBufferIterator.next() + withResource(otherBuffer) { _ => + otherBuffer match { + case devBuffer: DeviceMemoryBuffer => + hostBuffer.copyFromMemoryBufferAsync( + hostOffset, devBuffer, 0, otherBuffer.getLength, stream) + hostOffset += otherBuffer.getLength + case _ => + throw new IllegalStateException("copying from buffer without device memory") + } } } + stream.sync() + System.nanoTime() - start } - stream.sync() - System.nanoTime() - start + val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong + val bw = (szMB.toDouble / (spillNs.toDouble / 1000000000.0)).toLong + logDebug(s"Spill to host (chunked=$isChunked) " + + s"size=$szMB MiB bandwidth=$bw MiB/sec") + new RapidsHostMemoryBuffer( + other.id, + totalCopySize, + other.meta, + applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET), + hostBuffer) + }.orElse { + // skip host + logWarning(s"Buffer $other with size ${other.memoryUsedBytes} does not fit " + + s"in the host store, skipping tier.") + None } - val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong - val bw = (szMB.toDouble / (spillNs.toDouble / 1000000000.0)).toLong - logDebug(s"Spill to host (chunked=$isChunked) " + - s"size=$szMB MiB bandwidth=$bw MiB/sec") - new RapidsHostMemoryBuffer( - other.id, - totalCopySize, - other.meta, - applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET), - hostBuffer) - }.orElse { - // skip host - logWarning(s"Buffer $other with size ${other.memoryUsedBytes} does not fit " + - s"in the host store, skipping tier.") - None } } } @@ -177,7 +189,9 @@ class RapidsHostMemoryStore( with MemoryBuffer.EventHandler { override val storageTier: StorageTier = StorageTier.HOST - override def getMemoryBuffer: MemoryBuffer = synchronized { + override def getMemoryBuffer: MemoryBuffer = getHostMemoryBuffer + + override def getHostMemoryBuffer: HostMemoryBuffer = synchronized { buffer.synchronized { setSpillable(this, false) buffer.incRefCount() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 82efa7699ef..27c8bac497d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -371,41 +371,9 @@ class SpillableHostBuffer(handle: RapidsBufferHandle, handle.close() } - /** - * Acquires the underlying `RapidsBuffer` and uses - * `RapidsBuffer.withMemoryBufferReadLock` to obtain a read lock - * that will held while invoking `body` with a `HostMemoryBuffer`. - * @param body function that takes a `HostMemoryBuffer` and produces `K` - * @tparam K any return type specified by `body` - * @return the result of body(hostMemoryBuffer) - */ - def withHostBufferReadOnly[K](body: HostMemoryBuffer => K): K = { - withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => - rapidsBuffer.withMemoryBufferReadLock { - case hmb: HostMemoryBuffer => body(hmb) - case memoryBuffer => - throw new IllegalStateException( - s"Expected a HostMemoryBuffer but instead got ${memoryBuffer}") - } - } - } - - /** - * Acquires the underlying `RapidsBuffer` and uses - * `RapidsBuffer.withMemoryBufferWriteLock` to obtain a write lock - * that will held while invoking `body` with a `HostMemoryBuffer`. - * @param body function that takes a `HostMemoryBuffer` and produces `K` - * @tparam K any return type specified by `body` - * @return the result of body(hostMemoryBuffer) - */ - def withHostBufferWriteLock[K](body: HostMemoryBuffer => K): K = { + def getHostBuffer(): HostMemoryBuffer = { withResource(catalog.acquireBuffer(handle)) { rapidsBuffer => - rapidsBuffer.withMemoryBufferWriteLock { - case hmb: HostMemoryBuffer => body(hmb) - case memoryBuffer => - throw new IllegalStateException( - s"Expected a HostMemoryBuffer but instead got ${memoryBuffer}") - } + rapidsBuffer.getHostMemoryBuffer } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala index 61940ffd463..9b5b37af480 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import java.io.File -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer} +import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.StorageTier.{DEVICE, DISK, HOST, StorageTier} import com.nvidia.spark.rapids.format.TableMeta @@ -342,6 +342,7 @@ class RapidsBufferCatalogSuite extends AnyFunSuite with MockitoSugar { length: Long, stream: Cuda.Stream): Unit = {} override def getDeviceMemoryBuffer: DeviceMemoryBuffer = null + override def getHostMemoryBuffer: HostMemoryBuffer = null override def addReference(): Boolean = { if (_acquireAttempts > 0) { _acquireAttempts -= 1 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala index 153b8da6556..1ffad031451 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala @@ -267,7 +267,7 @@ class RapidsHostMemoryStoreSuite extends AnyFunSuite with MockitoSugar { withResource(spillableBuffer) { _ => // the refcount of 1 is the store assertResult(1)(hmb.getRefCount) - spillableBuffer.withHostBufferReadOnly { memoryBuffer => + withResource(spillableBuffer.getHostBuffer()) { memoryBuffer => assertResult(hmb)(memoryBuffer) assertResult(2)(memoryBuffer.getRefCount) } @@ -278,33 +278,46 @@ class RapidsHostMemoryStoreSuite extends AnyFunSuite with MockitoSugar { } test("host buffer originated: get host memory buffer after spill") { + RapidsBufferCatalog.close() val spillPriority = -10 val hostStoreMaxSize = 1L * 1024 * 1024 - val bm = new RapidsDiskBlockManager(new SparkConf()) - withResource(new RapidsDiskStore(bm)) { diskStore => - withResource(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) { hostStore => - withResource(new RapidsDeviceMemoryStore) { devStore => - val catalog = new RapidsBufferCatalog(devStore, hostStore) - devStore.setSpillStore(hostStore) - hostStore.setSpillStore(diskStore) - val hmb = HostMemoryBuffer.allocate(1L * 1024) - val spillableBuffer = SpillableHostBuffer( - hmb, - hmb.getLength, - spillPriority, - catalog) - assertResult(1)(hmb.getRefCount) - // we spill it - catalog.synchronousSpill(hostStore, 0) - withResource(spillableBuffer) { _ => - // the refcount of the original buffer is 0 because it spilled - assertResult(0)(hmb.getRefCount) - spillableBuffer.withHostBufferReadOnly { memoryBuffer => - assertResult(memoryBuffer.getLength)(hmb.getLength) + try { + val bm = new RapidsDiskBlockManager(new SparkConf()) + val (catalog, devStore, hostStore, diskStore) = + closeOnExcept(new RapidsDiskStore(bm)) { diskStore => + closeOnExcept(new RapidsDeviceMemoryStore()) { devStore => + closeOnExcept(new RapidsHostMemoryStore(Some(hostStoreMaxSize))) { hostStore => + devStore.setSpillStore(hostStore) + hostStore.setSpillStore(diskStore) + val catalog = closeOnExcept( + new RapidsBufferCatalog(devStore, hostStore)) { catalog => catalog } + (catalog, devStore, hostStore, diskStore) } } } + + RapidsBufferCatalog.setDeviceStorage(devStore) + RapidsBufferCatalog.setHostStorage(hostStore) + RapidsBufferCatalog.setDiskStorage(diskStore) + RapidsBufferCatalog.setCatalog(catalog) + + val hmb = HostMemoryBuffer.allocate(1L * 1024) + val spillableBuffer = SpillableHostBuffer( + hmb, + hmb.getLength, + spillPriority) + assertResult(1)(hmb.getRefCount) + // we spill it + RapidsBufferCatalog.synchronousSpill(RapidsBufferCatalog.getHostStorage, 0) + withResource(spillableBuffer) { _ => + // the refcount of the original buffer is 0 because it spilled + assertResult(0)(hmb.getRefCount) + withResource(spillableBuffer.getHostBuffer()) { memoryBuffer => + assertResult(memoryBuffer.getLength)(hmb.getLength) + } } + } finally { + RapidsBufferCatalog.close() } } @@ -326,7 +339,7 @@ class RapidsHostMemoryStoreSuite extends AnyFunSuite with MockitoSugar { catalog) // spillable is 1K assertResult(hmb.getLength)(hostStore.currentSpillableSize) - spillableBuffer.withHostBufferReadOnly { memoryBuffer => + withResource(spillableBuffer.getHostBuffer()) { memoryBuffer => // 0 because we have a reference to the memoryBuffer assertResult(0)(hostStore.currentSpillableSize) val spilled = catalog.synchronousSpill(hostStore, 0) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala index c4a531a8d7d..001f82ab3a0 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids import java.util.UUID -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer} +import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer} import com.nvidia.spark.rapids.{RapidsBuffer, RapidsBufferCatalog, RapidsBufferId, SpillableColumnarBatchImpl, StorageTier} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -54,6 +54,7 @@ class SpillableColumnarBatchSuite extends AnyFunSuite { override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long, length: Long, stream: Cuda.Stream): Unit = {} override def getDeviceMemoryBuffer: DeviceMemoryBuffer = null + override def getHostMemoryBuffer: HostMemoryBuffer = null override def addReference(): Boolean = true override def free(): Unit = {} override def getSpillPriority: Long = 0 From 38ea7ce70efa03c3832e68994a1783b40cc8de5c Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 19 Dec 2023 15:56:01 -0800 Subject: [PATCH 5/8] Add a fallback Cloudera Maven repo URL [skip ci] (#10081) Signed-off-by: Gera Shegalov --- pom.xml | 26 ++++++++++++++++++++++++-- scala2.13/pom.xml | 26 ++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 2b33c3339ba..e926cd6ffb0 100644 --- a/pom.xml +++ b/pom.xml @@ -844,6 +844,8 @@ -Djdk.reflect.useDirectMethodHandle=false false + https://repository.cloudera.com/artifactory/cloudera-repos + https://repository.cloudera.com/repository/cloudera-repos install ${spark.rapids.source.basedir}/.bloop ${project.build.outputDirectory}/rapids4spark-version-info.properties @@ -1731,7 +1733,17 @@ This will force full Scala code rebuild in downstream modules. cloudera-repo - https://repository.cloudera.com/artifactory/cloudera-repos/ + ${cloudera.repo.url} + + ${cloudera.repo.enabled} + + + ${cloudera.repo.enabled} + + + + cloudera-repo-fallback + ${cloudera.repo.url.fallback} ${cloudera.repo.enabled} @@ -1750,7 +1762,17 @@ This will force full Scala code rebuild in downstream modules. cloudera-repo - https://repository.cloudera.com/artifactory/cloudera-repos/ + ${cloudera.repo.url} + + ${cloudera.repo.enabled} + + + ${cloudera.repo.enabled} + + + + cloudera-repo-fallback + ${cloudera.repo.url.fallback} ${cloudera.repo.enabled} diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index e8a81bb77bd..7fc389cfad5 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -844,6 +844,8 @@ -Djdk.reflect.useDirectMethodHandle=false false + https://repository.cloudera.com/artifactory/cloudera-repos + https://repository.cloudera.com/repository/cloudera-repos install ${spark.rapids.source.basedir}/.bloop ${project.build.outputDirectory}/rapids4spark-version-info.properties @@ -1731,7 +1733,17 @@ This will force full Scala code rebuild in downstream modules. cloudera-repo - https://repository.cloudera.com/artifactory/cloudera-repos/ + ${cloudera.repo.url} + + ${cloudera.repo.enabled} + + + ${cloudera.repo.enabled} + + + + cloudera-repo-fallback + ${cloudera.repo.url.fallback} ${cloudera.repo.enabled} @@ -1750,7 +1762,17 @@ This will force full Scala code rebuild in downstream modules. cloudera-repo - https://repository.cloudera.com/artifactory/cloudera-repos/ + ${cloudera.repo.url} + + ${cloudera.repo.enabled} + + + ${cloudera.repo.enabled} + + + + cloudera-repo-fallback + ${cloudera.repo.url.fallback} ${cloudera.repo.enabled} From 44ae358fcdb5f1d94f3eb706878271ac1212aab9 Mon Sep 17 00:00:00 2001 From: Navin Kumar <97137715+NVnavkumar@users.noreply.github.com> Date: Tue, 19 Dec 2023 21:45:51 -0800 Subject: [PATCH 6/8] Test full timestamp output range in PySpark [databricks] (#9996) * WIP: proposed approach to allowing testing of full Spark timestamp range Signed-off-by: Navin Kumar * Make the timestamp patch the default for all tests Signed-off-by: Navin Kumar * Handle toInternal side of these types and update workaround for hash_aggregate_test issues Signed-off-by: Navin Kumar * remove single use min_start Signed-off-by: Navin Kumar * Move this integration test to df.collect() instead of df.rdd Signed-off-by: Navin Kumar * Move the array sorting to Python outside of Spark to reduce overhead and avoid creating a new dataframe from collect Signed-off-by: Navin Kumar * Cleanup sort_locally method to make this function read cleaner Signed-off-by: Navin Kumar * More cleanup Signed-off-by: Navin Kumar --------- Signed-off-by: Navin Kumar --- integration_tests/src/main/python/asserts.py | 39 +++++++++------ integration_tests/src/main/python/conftest.py | 6 +++ integration_tests/src/main/python/data_gen.py | 16 +++--- .../src/main/python/date_time_test.py | 4 +- .../src/main/python/hash_aggregate_test.py | 43 ++++++---------- .../src/main/python/spark_session.py | 42 ++++++++++++++++ .../src/main/python/window_function_test.py | 49 ++++++++----------- 7 files changed, 114 insertions(+), 85 deletions(-) diff --git a/integration_tests/src/main/python/asserts.py b/integration_tests/src/main/python/asserts.py index 7d97c81af04..32416612d26 100644 --- a/integration_tests/src/main/python/asserts.py +++ b/integration_tests/src/main/python/asserts.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from conftest import is_incompat, should_sort_on_spark, should_sort_locally, get_float_check, get_limit, spark_jvm +from conftest import is_incompat, should_sort_on_spark, should_sort_locally, array_columns_to_sort_locally, get_float_check, get_limit, spark_jvm from datetime import date, datetime, timedelta from decimal import Decimal import math @@ -220,6 +220,20 @@ def bring_back(spark): raise RuntimeError('Local Sort is only supported on a collect') return (bring_back, collect_type) +# Sort each of the result sets. If there are array columns to sort, +# then sort each of those values in each row +def _sort_locally(*results): + array_columns = array_columns_to_sort_locally() + def sort_rows(rows): + if array_columns: + for r in rows: + for col in array_columns: + r[col].sort(key=_RowCmp) + rows.sort(key=_RowCmp) + + for rows in results: + sort_rows(rows) + def _prep_incompat_conf(conf): if is_incompat(): conf = dict(conf) # Make a copy before we change anything @@ -257,8 +271,7 @@ def _assert_gpu_and_cpu_writes_are_equal( from_cpu = with_cpu_session(cpu_bring_back, conf=conf) from_gpu = with_cpu_session(gpu_bring_back, conf=conf) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) - from_gpu.sort(key=_RowCmp) + _sort_locally(from_cpu, from_gpu) assert_equal(from_cpu, from_gpu) @@ -317,8 +330,7 @@ def do_write(spark, table_name): from_cpu = with_cpu_session(cpu_bring_back, conf=conf) from_gpu = with_cpu_session(gpu_bring_back, conf=conf) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) - from_gpu.sort(key=_RowCmp) + _sort_locally(from_cpu, from_gpu) assert_equal(from_cpu, from_gpu) @@ -366,8 +378,7 @@ def assert_gpu_fallback_write(write_func, from_cpu = with_cpu_session(cpu_bring_back, conf=conf) from_gpu = with_cpu_session(gpu_bring_back, conf=conf) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) - from_gpu.sort(key=_RowCmp) + _sort_locally(from_cpu, from_gpu) assert_equal(from_cpu, from_gpu) finally: @@ -403,8 +414,7 @@ def assert_cpu_and_gpu_are_equal_collect_with_capture(func, print('### {}: GPU TOOK {} CPU TOOK {} ###'.format(collect_type, gpu_end - gpu_start, cpu_end - cpu_start)) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) - from_gpu.sort(key=_RowCmp) + _sort_locally(from_cpu, from_gpu) assert_equal(from_cpu, from_gpu) @@ -446,8 +456,7 @@ def assert_gpu_fallback_collect(func, print('### {}: GPU TOOK {} CPU TOOK {} ###'.format(collect_type, gpu_end - gpu_start, cpu_end - cpu_start)) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) - from_gpu.sort(key=_RowCmp) + _sort_locally(from_cpu, from_gpu) assert_equal(from_cpu, from_gpu) @@ -503,8 +512,7 @@ def run_on_gpu(): (from_cpu, from_gpu) = result_canonicalize_func_before_compare(from_cpu, from_gpu) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) - from_gpu.sort(key=_RowCmp) + _sort_locally(from_cpu, from_gpu) assert_equal(from_cpu, from_gpu) @@ -530,7 +538,7 @@ def run_on_cpu(): print('### {}: CPU TOOK {} ###'.format(collect_type, cpu_end - cpu_start)) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) + _sort_locally(from_cpu) return from_cpu @@ -564,8 +572,7 @@ def run_on_gpu(): print('### {}: GPU TOOK {} CPU TOOK {} ###'.format(collect_type, gpu_end - gpu_start, cpu_end - cpu_start)) if should_sort_locally(): - from_cpu.sort(key=_RowCmp) - from_gpu.sort(key=_RowCmp) + _sort_locally(from_cpu, from_gpu) return (from_cpu, from_gpu) diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index e9cb9f54dbe..9daf4fb7e4a 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -41,6 +41,7 @@ def is_incompat(): _sort_on_spark = False _sort_locally = False +_sort_array_columns_locally = [] def should_sort_on_spark(): return _sort_on_spark @@ -48,6 +49,9 @@ def should_sort_on_spark(): def should_sort_locally(): return _sort_locally +def array_columns_to_sort_locally(): + return _sort_array_columns_locally + _allow_any_non_gpu = False _non_gpu_allowed = [] @@ -169,6 +173,7 @@ def get_std_input_path(): def pytest_runtest_setup(item): global _sort_on_spark global _sort_locally + global _sort_array_columns_locally global _inject_oom global _test_datagen_random_seed _inject_oom = item.get_closest_marker('inject_oom') @@ -188,6 +193,7 @@ def pytest_runtest_setup(item): if order.kwargs.get('local', False): _sort_on_spark = False _sort_locally = True + _sort_array_columns_locally = order.kwargs.get('arrays', []) else: _sort_on_spark = True _sort_locally = False diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index db8e1cced85..c83b08cb03a 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -584,19 +584,11 @@ class TimestampGen(DataGen): def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc): super().__init__(TimestampNTZType() if tzinfo==None else TimestampType(), nullable=nullable) if start is None: - # Spark supports times starting at - # "0001-01-01 00:00:00.000000" - # but it has issues if you get really close to that because it tries to do things - # in a different format which causes roundoff, so we have to add a few days, even a month, - # just to be sure - start = datetime(1, 2, 1, tzinfo=tzinfo) + start = datetime(1, 1, 1, tzinfo=tzinfo) elif not isinstance(start, datetime): raise RuntimeError('Unsupported type passed in for start {}'.format(start)) - # Spark supports time through: "9999-12-31 23:59:59.999999" - # but in order to avoid out-of-range error in non-UTC time zone, here use 9999-12-30 instead of 12-31 as max end - # for details, refer to https://github.com/NVIDIA/spark-rapids/issues/7535 - max_end = datetime(9999, 12, 30, 23, 59, 59, 999999, tzinfo=tzinfo) + max_end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=tzinfo) if end is None: end = max_end elif isinstance(end, timedelta): @@ -612,6 +604,10 @@ def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc): self._start_time = self._to_us_since_epoch(start) self._end_time = self._to_us_since_epoch(end) self._tzinfo = tzinfo + + self.with_special_case(start) + self.with_special_case(end) + if (self._epoch >= start and self._epoch <= end): self.with_special_case(self._epoch) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 78586197d84..1787ec81cee 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -422,7 +422,7 @@ def test_string_unix_timestamp_ansi_exception(): error_message="Exception", conf=ansi_enabled_conf) -@pytest.mark.parametrize('data_gen', [StringGen('200[0-9]-0[1-9]-[0-2][1-8]')], ids=idfn) +@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{4}-0[1-9]-[0-2][1-8]')], ids=idfn) @pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF']) @allow_non_gpu(*non_utc_allow) def test_gettimestamp(data_gen, ansi_enabled): @@ -431,7 +431,7 @@ def test_gettimestamp(data_gen, ansi_enabled): {'spark.sql.ansi.enabled': ansi_enabled}) -@pytest.mark.parametrize('data_gen', [StringGen('0[1-9]200[0-9]')], ids=idfn) +@pytest.mark.parametrize('data_gen', [StringGen('0[1-9][0-9]{4}')], ids=idfn) @allow_non_gpu(*non_utc_allow) def test_gettimestamp_format_MMyyyy(data_gen): assert_gpu_and_cpu_are_equal_collect( diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 631ac8e71b1..609a096d55e 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -16,10 +16,11 @@ import pytest from asserts import * -from conftest import is_databricks_runtime +from conftest import is_databricks_runtime, spark_jvm from conftest import is_not_utc from data_gen import * from functools import reduce +from pyspark.sql.dataframe import DataFrame from pyspark.sql.types import * from marks import * import pyspark.sql.functions as f @@ -624,23 +625,19 @@ def test_min_max_group_by(data_gen): .agg(f.min('b'), f.max('b'))) # To avoid ordering issues with collect_list, sorting the arrays that are returned. -# Note, using sort_array() on the CPU, because sort_array() does not yet +# NOTE: sorting the arrays locally, because sort_array() does not yet # support sorting certain nested/arbitrary types on the GPU # See https://github.com/NVIDIA/spark-rapids/issues/3715 # and https://github.com/rapidsai/cudf/issues/11222 -@allow_non_gpu("ProjectExec", "SortArray", *non_utc_allow) -@ignore_order(local=True) +@allow_non_gpu("ProjectExec", *non_utc_allow) +@ignore_order(local=True, arrays=["blist"]) @pytest.mark.parametrize('data_gen', _gen_data_for_collect_list_op, ids=idfn) @pytest.mark.parametrize('use_obj_hash_agg', [True, False], ids=idfn) def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg): def doit(spark): - df = gen_df(spark, data_gen, length=100)\ + return gen_df(spark, data_gen, length=100)\ .groupby('a')\ .agg(f.collect_list('b').alias("blist")) - # pull out the rdd and schema and create a new dataframe to run SortArray - # to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec - # to ObjectHashAggregateExec - return spark.createDataFrame(df.rdd, schema=df.schema).select("a", f.sort_array("blist")) assert_gpu_and_cpu_are_equal_collect( doit, conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower()}) @@ -680,28 +677,22 @@ def test_hash_groupby_collect_set_on_nested_type(data_gen): .agg(f.sort_array(f.collect_set('b')))) -# Note, using sort_array() on the CPU, because sort_array() does not yet +# NOTE: sorting the arrays locally, because sort_array() does not yet # support sorting certain nested/arbitrary types on the GPU # See https://github.com/NVIDIA/spark-rapids/issues/3715 # and https://github.com/rapidsai/cudf/issues/11222 -@ignore_order(local=True) -@allow_non_gpu("ProjectExec", "SortArray", *non_utc_allow) +@ignore_order(local=True, arrays=["collect_set"]) +@allow_non_gpu("ProjectExec", *non_utc_allow) @pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn) def test_hash_groupby_collect_set_on_nested_array_type(data_gen): conf = copy_and_update(_float_conf, { "spark.rapids.sql.castFloatToString.enabled": "true", - "spark.rapids.sql.expression.SortArray": "false" }) def do_it(spark): - df = gen_df(spark, data_gen, length=100)\ + return gen_df(spark, data_gen, length=100)\ .groupby('a')\ .agg(f.collect_set('b').alias("collect_set")) - # pull out the rdd and schema and create a new dataframe to run SortArray - # to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec - # to ObjectHashAggregateExec - return spark.createDataFrame(df.rdd, schema=df.schema)\ - .selectExpr("sort_array(collect_set)") assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) @@ -723,27 +714,21 @@ def test_hash_reduction_collect_set_on_nested_type(data_gen): .agg(f.sort_array(f.collect_set('b')))) -# Note, using sort_array() on the CPU, because sort_array() does not yet +# NOTE: sorting the arrays locally, because sort_array() does not yet # support sorting certain nested/arbitrary types on the GPU # See https://github.com/NVIDIA/spark-rapids/issues/3715 # and https://github.com/rapidsai/cudf/issues/11222 -@ignore_order(local=True) -@allow_non_gpu("ProjectExec", "SortArray", *non_utc_allow) +@ignore_order(local=True, arrays=["collect_set"]) +@allow_non_gpu("ProjectExec", *non_utc_allow) @pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn) def test_hash_reduction_collect_set_on_nested_array_type(data_gen): conf = copy_and_update(_float_conf, { "spark.rapids.sql.castFloatToString.enabled": "true", - "spark.rapids.sql.expression.SortArray": "false" }) def do_it(spark): - df = gen_df(spark, data_gen, length=100)\ + return gen_df(spark, data_gen, length=100)\ .agg(f.collect_set('b').alias("collect_set")) - # pull out the rdd and schema and create a new dataframe to run SortArray - # to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec - # to ObjectHashAggregateExec - return spark.createDataFrame(df.rdd, schema=df.schema)\ - .selectExpr("sort_array(collect_set)") assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 279924b700c..aca1ea82d62 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -13,9 +13,14 @@ # limitations under the License. import os +import calendar, time +from datetime import date, datetime +from contextlib import contextmanager, ExitStack from conftest import is_allowing_any_non_gpu, get_non_gpu_allowed, get_validate_execs_in_gpu_plan, is_databricks_runtime, is_at_least_precommit_run, get_inject_oom_conf from pyspark.sql import DataFrame +from pyspark.sql.types import TimestampType, DateType, _acceptable_types from spark_init_internal import get_spark_i_know_what_i_am_doing, spark_version +from unittest.mock import patch def _from_scala_map(scala_map): ret = {} @@ -81,6 +86,43 @@ def _check_for_proper_return_values(something): if (isinstance(something, DataFrame)): raise RuntimeError("You should never return a DataFrame from a with_*_session, you will not get the results that you expect") +@contextmanager +def pyspark_compatibility_fixes(): + def timestampToInternal(_, dt): + if isinstance(dt, int): + return dt + if isinstance(dt, datetime): + seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) + return int(seconds) * 1000000 + dt.microsecond + + def dateToInternal(self, d): + if isinstance(d, int): + return d + if isinstance(d, (datetime, date)): + return d.toordinal() - self.EPOCH_ORDINAL + + pyspark_compatibility_fixes = [] + # Patch timestamp and date types so that we only compare in testing to the internal integral representations + pyspark_compatibility_fixes.append(patch.object(TimestampType, 'fromInternal', lambda _, ts: ts)) + pyspark_compatibility_fixes.append(patch.object(TimestampType, 'toInternal', timestampToInternal)) + pyspark_compatibility_fixes.append(patch.dict(_acceptable_types, { TimestampType: (datetime, int)})) + try: + from pyspark.sql.types import TimestampNTZType + pyspark_compatibility_fixes.append(patch.object(TimestampNTZType, 'fromInternal', lambda _, ts: ts)) + pyspark_compatibility_fixes.append(patch.object(TimestampNTZType, 'toInternal', timestampToInternal)) + pyspark_compatibility_fixes.append(patch.dict(_acceptable_types, { TimestampNTZType: (datetime, int)})) + except ImportError: + pass + pyspark_compatibility_fixes.append(patch.object(DateType, 'fromInternal', lambda _, v: v)) + pyspark_compatibility_fixes.append(patch.object(DateType, 'toInternal', dateToInternal)) + pyspark_compatibility_fixes.append(patch.dict(_acceptable_types, { DateType: (datetime, date, int)})) + with ExitStack() as stack: + for cm in pyspark_compatibility_fixes: + stack.enter_context(cm) + yield + +@pyspark_compatibility_fixes() def with_spark_session(func, conf={}): """Run func that takes a spark session as input with the given configs set.""" reset_spark_session_conf() diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 59c763c5505..f9f3b063a97 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1478,18 +1478,34 @@ def test_window_aggs_for_rows_collect_set(): # support sorting certain nested/arbitrary types on the GPU # See https://github.com/NVIDIA/spark-rapids/issues/3715 # and https://github.com/rapidsai/cudf/issues/11222 -@ignore_order(local=True) -@allow_non_gpu("ProjectExec", "SortArray", *non_utc_allow) +@ignore_order(local=True, arrays=[ + "cc_struct_array_1", + "cc_struct_array_2", + "cc_array_struct", + "cc_array_array_bool", + "cc_array_array_int", + "cc_array_array_long", + "cc_array_array_short", + "cc_array_array_date", + "cc_array_array_ts", + "cc_array_array_byte", + "cc_array_array_str", + "cc_array_array_float", + "cc_array_array_double", + "cc_array_array_decimal_32", + "cc_array_array_decimal_64", + "cc_array_array_decimal_128" +]) +@allow_non_gpu("ProjectExec", *non_utc_allow) def test_window_aggs_for_rows_collect_set_nested_array(): conf = copy_and_update(_float_conf, { "spark.rapids.sql.castFloatToString.enabled": "true", - "spark.rapids.sql.expression.SortArray": "false" }) def do_it(spark): df = gen_df(spark, _gen_data_for_collect_set_nested, length=512) df.createOrReplaceTempView("window_collect_table") - df = spark.sql( + return spark.sql( """select a, b, collect_set(c_struct_array_1) over (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_struct_array_1, @@ -1525,30 +1541,6 @@ def do_it(spark): (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_128 from window_collect_table """) - df = spark.createDataFrame(df.rdd, schema=df.schema) - # pull out the rdd and schema and create a new dataframe to run SortArray - # to handle Databricks 10.4+ optimization that moves SortArray from ProjectExec - # to ObjectHashAggregateExec - df.createOrReplaceTempView("window_collect_table_2") - return spark.sql("""select a, b, - sort_array(cc_struct_array_1), - sort_array(cc_struct_array_2), - sort_array(cc_array_struct), - sort_array(cc_array_array_bool), - sort_array(cc_array_array_int), - sort_array(cc_array_array_long), - sort_array(cc_array_array_short), - sort_array(cc_array_array_date), - sort_array(cc_array_array_ts), - sort_array(cc_array_array_byte), - sort_array(cc_array_array_str), - sort_array(cc_array_array_float), - sort_array(cc_array_array_double), - sort_array(cc_array_array_decimal_32), - sort_array(cc_array_array_decimal_64), - sort_array(cc_array_array_decimal_128) - from window_collect_table_2 - """) assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) @@ -1564,6 +1556,7 @@ def test_nested_part_fallback(part_gen): ('a', RepeatSeqGen(part_gen, length=20)), ('b', UniqueLongGen()), ('c', int_gen)] + window_spec = Window.partitionBy('a').orderBy('b').rowsBetween(-5, 5) def do_it(spark): From 0ca4a2c4184702109d96cdb7d130519e765985f9 Mon Sep 17 00:00:00 2001 From: Sean Lee Date: Wed, 20 Dec 2023 13:34:30 -0800 Subject: [PATCH 7/8] Add support for StringConcatFactory.makeConcatWithConstants (#9555) (#10057) * Add support for StringConcatFactory.makeConcatWithConstants (#9555) Signed-off-by: Sean Lee * Update copyright year Signed-off-by: Sean Lee --------- Signed-off-by: Sean Lee --- .../com/nvidia/spark/udf/Instruction.scala | 33 ++++++++++++++++++- .../nvidia/spark/udf/LambdaReflection.scala | 28 +++++++++++++++- 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/udf-compiler/src/main/scala-2.13/com/nvidia/spark/udf/Instruction.scala b/udf-compiler/src/main/scala-2.13/com/nvidia/spark/udf/Instruction.scala index ccd591ac4f5..36fe79da384 100644 --- a/udf-compiler/src/main/scala-2.13/com/nvidia/spark/udf/Instruction.scala +++ b/udf-compiler/src/main/scala-2.13/com/nvidia/spark/udf/Instruction.scala @@ -314,6 +314,12 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend val (args, rest) = stack.splitAt(n + 1) (args.reverse, rest) }) + case Opcode.INVOKEDYNAMIC => + invokedynamic(lambdaReflection, state, + (stack, n) => { + val (args, rest) = stack.splitAt(n) + (args.reverse, rest) + }) case _ => throw new SparkException("Unsupported instruction: " + instructionStr) } logDebug(s"[Instruction] ${instructionStr} got new state: ${st} from state: ${state}") @@ -563,6 +569,31 @@ case class Instruction(opcode: Int, operand: Int, instructionStr: String) extend } } + private def invokedynamic(lambdaReflection: LambdaReflection, state: State, + getArgs: (List[Expression], Int) => + (List[Expression], List[Expression])): State = { + val State(locals, stack, cond, expr) = state + val (bootstrapMethod, bootstrapArgs) = lambdaReflection.lookupBootstrapMethod(operand) + val declaringClass = bootstrapMethod.getDeclaringClass + val declaringClassName = declaringClass.getName + val newstack = { + if (declaringClassName.equals("java.lang.invoke.StringConcatFactory") && + bootstrapMethod.getName.equals("makeConcatWithConstants") && + bootstrapArgs.length == 1) { + val recipe = bootstrapArgs.head.toString + if (recipe.contains('\u0002')) { + throw new SparkException("Unsupported instruction: " + instructionStr) + } + val (args, rest) = getArgs(stack, recipe.count{x => x == '\u0001'}) + Concat(recipe.split('\u0001').zipAll(args, "", Literal("")) + .map{ case(x, y) => Concat(Seq(Literal(x), y))}.toSeq) :: rest + } else { + throw new SparkException("Unsupported instruction: " + instructionStr) + } + } + State(locals, newstack, cond, expr) + } + private def checkArgs(methodName: String, expectedTypes: List[DataType], args: List[Expression]): Unit = { @@ -958,7 +989,7 @@ object Instruction { codeIterator.byteAt(offset + 1) case Opcode.BIPUSH => codeIterator.signedByteAt(offset + 1) - case Opcode.LDC_W | Opcode.LDC2_W | Opcode.NEW | Opcode.CHECKCAST | + case Opcode.LDC_W | Opcode.LDC2_W | Opcode.NEW | Opcode.CHECKCAST | Opcode.INVOKEDYNAMIC | Opcode.INVOKESTATIC | Opcode.INVOKEVIRTUAL | Opcode.INVOKEINTERFACE | Opcode.INVOKESPECIAL | Opcode.GETSTATIC => codeIterator.u16bitAt(offset + 1) diff --git a/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala b/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala index cb21aacc9e5..df9387d26af 100644 --- a/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala +++ b/udf-compiler/src/main/scala/com/nvidia/spark/udf/LambdaReflection.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +78,32 @@ class LambdaReflection private(private val classPool: ClassPool, } } + def lookupBootstrapMethod(constPoolIndex: Int): (CtMethod, Seq[Any]) = { + if (constPool.getTag(constPoolIndex) != ConstPool.CONST_InvokeDynamic) { + throw new SparkException(s"Unexpected index ${constPoolIndex} for bootstrap") + } + val bootstrapMethodIndex = constPool.getInvokeDynamicBootstrap(constPoolIndex) + val bootstrapMethodsAttribute = ctMethod.getDeclaringClass.getClassFile.getAttributes + .toArray.filter(_.isInstanceOf[javassist.bytecode.BootstrapMethodsAttribute]) + if (bootstrapMethodsAttribute.length != 1) { + throw new SparkException(s"Multiple bootstrap methods attributes aren't supported") + } + val bootstrapMethods = bootstrapMethodsAttribute.head + .asInstanceOf[javassist.bytecode.BootstrapMethodsAttribute].getMethods + val bootstrapMethod = bootstrapMethods(bootstrapMethodIndex) + val bootstrapMethodArguments = try { + bootstrapMethod.arguments.map(lookupConstant) + } catch { + case _: Throwable => + throw new SparkException(s"only constants are supported as bootstrap method arguments") + } + val constPoolIndexMethodref = constPool.getMethodHandleIndex(bootstrapMethod.methodRef) + val methodName = constPool.getMethodrefName(constPoolIndexMethodref) + val descriptor = constPool.getMethodrefType(constPoolIndexMethodref) + val className = constPool.getMethodrefClassName(constPoolIndexMethodref) + (classPool.getCtClass(className).getMethod(methodName, descriptor), bootstrapMethodArguments) + } + def lookupClassName(constPoolIndex: Int): String = { if (constPool.getTag(constPoolIndex) != ConstPool.CONST_Class) { throw new SparkException("Unexpected index for class") From bb235c9cc354299a78f26fecbaf9be67bcf5ae39 Mon Sep 17 00:00:00 2001 From: Navin Kumar <97137715+NVnavkumar@users.noreply.github.com> Date: Wed, 20 Dec 2023 16:00:56 -0800 Subject: [PATCH 8/8] Cleanup usage of non-utc configuration here (#10071) Signed-off-by: Navin Kumar --- integration_tests/src/main/python/date_time_test.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 1787ec81cee..ed60c3ab1ec 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -456,21 +456,17 @@ def test_date_format_for_date(data_gen, date_format): @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) @pytest.mark.skipif(not is_supported_time_zone(), reason="not all time zones are supported now, refer to https://github.com/NVIDIA/spark-rapids/issues/6839, please update after all time zones are supported") def test_date_format_for_time(data_gen, date_format): - conf = {'spark.rapids.sql.nonUTC.enabled': True} assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)), - conf) + lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format))) @pytest.mark.parametrize('date_format', supported_date_formats, ids=idfn) @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) @pytest.mark.skipif(is_supported_time_zone(), reason="not all time zones are supported now, refer to https://github.com/NVIDIA/spark-rapids/issues/6839, please update after all time zones are supported") @allow_non_gpu('ProjectExec') def test_date_format_for_time_fall_back(data_gen, date_format): - conf = {'spark.rapids.sql.nonUTC.enabled': True} assert_gpu_fallback_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)), - 'ProjectExec', - conf) + 'ProjectExec') @pytest.mark.parametrize('date_format', supported_date_formats + ['yyyyMMdd'], ids=idfn) # from 0001-02-01 to 9999-12-30 to avoid 'year 0 is out of range'