diff --git a/CHANGELOG.md b/CHANGELOG.md index 2aab89d6f26..0340afa5931 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ # Change log -Generated on 2024-05-09 +Generated on 2024-05-20 ## Release 24.04 @@ -29,6 +29,7 @@ Generated on 2024-05-09 ### Bugs Fixed ||| |:---|:---| +|[#10700](https://github.com/NVIDIA/spark-rapids/issues/10700)|[BUG] get_json_object cannot handle ints or boolean values| |[#10645](https://github.com/NVIDIA/spark-rapids/issues/10645)|[BUG] java.lang.IllegalStateException: Expected to only receive a single batch| |[#10665](https://github.com/NVIDIA/spark-rapids/issues/10665)|[BUG] Need to update private jar's version to v24.04.1 for spark-rapids v24.04.0 release| |[#10589](https://github.com/NVIDIA/spark-rapids/issues/10589)|[BUG] ZSTD version mismatch in integration tests| @@ -85,6 +86,7 @@ Generated on 2024-05-09 ||| |:---|:---| |[#10782](https://github.com/NVIDIA/spark-rapids/pull/10782)|Update latest changelog [skip ci]| +|[#10780](https://github.com/NVIDIA/spark-rapids/pull/10780)|[DOC]Update download page for v24.04.1 [skip ci]| |[#10777](https://github.com/NVIDIA/spark-rapids/pull/10777)|Update rapids JNI dependency: private to 24.04.2| |[#10683](https://github.com/NVIDIA/spark-rapids/pull/10683)|Update latest changelog [skip ci]| |[#10681](https://github.com/NVIDIA/spark-rapids/pull/10681)|Update rapids JNI dependency to 24.04.0, private to 24.04.1| diff --git a/docs/archive.md b/docs/archive.md index e2e23a26f11..6cce30557f4 100644 --- a/docs/archive.md +++ b/docs/archive.md @@ -5,6 +5,52 @@ nav_order: 15 --- Below are archived releases for RAPIDS Accelerator for Apache Spark. +## Release v24.04.0 +### Hardware Requirements: + +The plugin is tested on the following architectures: + @@ -67,14 +67,14 @@ for your hardware's minimum driver version. +### RAPIDS Accelerator's Support Policy for Apache Spark +The RAPIDS Accelerator maintains support for Apache Spark versions available for download from [Apache Spark](https://spark.apache.org/downloads.html) + +### Download RAPIDS Accelerator for Apache Spark v24.04.0 + +| Processor | Scala Version | Download Jar | Download Signature | +|-----------|---------------|--------------|--------------------| +| x86_64 | Scala 2.12 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0.jar.asc) | +| x86_64 | Scala 2.13 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0.jar.asc) | +| arm64 | Scala 2.12 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0-cuda11-arm64.jar.asc) | +| arm64 | Scala 2.13 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0-cuda11-arm64.jar.asc) | + +This package is built against CUDA 11.8. It is tested on V100, T4, A10, A100, L4 and H100 GPUs with +CUDA 11.8 through CUDA 12.0. + +### Verify signature +* Download the [PUB_KEY](https://keys.openpgp.org/search?q=sw-spark@nvidia.com). +* Import the public key: `gpg --import PUB_KEY` +* Verify the signature for Scala 2.12 jar: + `gpg --verify rapids-4-spark_2.12-24.04.0.jar.asc rapids-4-spark_2.12-24.04.0.jar` +* Verify the signature for Scala 2.13 jar: + `gpg --verify rapids-4-spark_2.13-24.04.0.jar.asc rapids-4-spark_2.13-24.04.0.jar` + +The output of signature verify: + + gpg: Good signature from "NVIDIA Spark (For the signature of spark-rapids release jars) " + +### Release Notes +* New functionality and performance improvements for this release include: +* Performance improvements for S3 reading. +Refer to perfio.s3.enabled in [advanced_configs](./additional-functionality/advanced_configs.md) for more details. +* Performance improvements when doing a joins on unique keys. +* Enhanced decompression kernels for zstd and snappy. +* Enhanced Parquet reading performance with modular kernels. +* Added compatibility with Spark version 3.5.1. +* Deprecated support for Databricks 10.4 ML LTS. +* For updates on RAPIDS Accelerator Tools, please visit [this link](https://github.com/NVIDIA/spark-rapids-tools/releases). + +For a detailed list of changes, please refer to the +[CHANGELOG](https://github.com/NVIDIA/spark-rapids/blob/main/CHANGELOG.md). + ## Release v24.02.0 ### Hardware Requirements: diff --git a/pom.xml b/pom.xml index 778b1f833dd..96bb90f7206 100644 --- a/pom.xml +++ b/pom.xml @@ -699,7 +699,7 @@ cuda11 ${cuda.version} 24.04.0 - 24.04.2 + 24.04.3 2.12 2.8.0 incremental diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index f03cc29b8d2..d46eb1a1079 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -699,7 +699,7 @@ cuda11 ${cuda.version} 24.04.0 - 24.04.2 + 24.04.3 2.13 2.8.0 incremental diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala index 3f3f2803f5c..bc2f30dff2f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +31,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner -import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} +import org.apache.spark.sql.rapids.execution.python.shims.GpuGroupedPythonRunnerFactory +import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -109,8 +109,6 @@ case class GpuAggregateInPandasExec( val (mNumInputRows, mNumInputBatches, mNumOutputRows, mNumOutputBatches) = commonGpuMetrics() lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) - val sessionLocalTimeZone = conf.sessionLocalTimeZone - val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) val childOutput = child.output val resultExprs = resultExpressions @@ -204,27 +202,22 @@ case class GpuAggregateInPandasExec( } } + val runnerFactory = GpuGroupedPythonRunnerFactory(conf, pyFuncs, argOffsets, + aggInputSchema, DataTypeUtilsShim.fromAttributes(pyOutAttributes), + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF) + // Third, sends to Python to execute the aggregate and returns the result. if (pyInputIter.hasNext) { // Launch Python workers only when the data is not empty. - val pyRunner = new GpuArrowPythonRunner( - pyFuncs, - PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, - argOffsets, - aggInputSchema, - sessionLocalTimeZone, - pythonRunnerConf, - // The whole group data should be written in a single call, so here is unlimited - Int.MaxValue, - DataTypeUtilsShim.fromAttributes(pyOutAttributes)) - + val pyRunner = runnerFactory.getRunner() val pyOutputIterator = pyRunner.compute(pyInputIter, context.partitionId(), context) val combinedAttrs = gpuGroupingExpressions.map(_.toAttribute) ++ pyOutAttributes val resultRefs = GpuBindReferences.bindGpuReferences(resultExprs, combinedAttrs) // Gets the combined batch for each group and projects for the output. - new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, pyRunner, - mNumOutputRows, mNumOutputBatches).map { combinedBatch => + new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, + pyRunner.asInstanceOf[GpuArrowOutput], mNumOutputRows, + mNumOutputBatches).map { combinedBatch => withResource(combinedBatch) { batch => GpuProjectExec.project(batch, resultRefs) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala index 6c2f716583f..4a24a449b24 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import com.nvidia.spark.rapids.shims.ShimUnaryExecNode import org.apache.spark.TaskContext -import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} @@ -123,7 +123,8 @@ case class GpuFlatMapGroupsInPandasExec( resolveArgOffsets(child, groupingAttributes) val runnerFactory = GpuGroupedPythonRunnerFactory(conf, chainedFunc, Array(argOffsets), - DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema) + DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) // Start processing. Map grouped batches to ArrowPythonRunner results. child.executeColumnar().mapPartitionsInternal { inputIter => diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index 4186effcf84..a35cba87a16 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,7 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python.shims -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -49,14 +49,15 @@ case class GpuGroupedPythonRunnerFactory( chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], dedupAttrs: StructType, - pythonOutputSchema: StructType) { + pythonOutputSchema: StructType, + evalType: Int) { val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) def getRunner(): GpuBasePythonRunner[ColumnarBatch] = { new GpuArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, @@ -65,4 +66,4 @@ case class GpuGroupedPythonRunnerFactory( Int.MaxValue, pythonOutputSchema) } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index c97bf1abd3e..451de0a2527 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -20,7 +20,7 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python.shims -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -30,7 +30,8 @@ case class GpuGroupedPythonRunnerFactory( chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], dedupAttrs: StructType, - pythonOutputSchema: StructType) { + pythonOutputSchema: StructType, + evalType: Int) { // Configs from DB runtime val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled @@ -41,7 +42,7 @@ case class GpuGroupedPythonRunnerFactory( if (zeroConfEnabled && maxBytes > 0L) { new GpuGroupUDFArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory( } else { new GpuArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index f2297248711..b1dabbf5b5e 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -24,24 +24,25 @@ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch -//TODO is this needed? we already have a similar version in spark330db case class GpuGroupedPythonRunnerFactory( conf: org.apache.spark.sql.internal.SQLConf, chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], dedupAttrs: StructType, - pythonOutputSchema: StructType) { + pythonOutputSchema: StructType, + evalType: Int) { // Configs from DB runtime val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled + val isArrowBatchSlicingEnabled = conf.pythonArrowBatchSlicingEnabled val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) def getRunner(): GpuBasePythonRunner[ColumnarBatch] = { - if (zeroConfEnabled && maxBytes > 0L) { + if (isArrowBatchSlicingEnabled || (zeroConfEnabled && maxBytes > 0L)) { new GpuGroupUDFArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory( } else { new GpuArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone,