Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch-24.04 into main [skip ci] #108

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Change log
Generated on 2024-05-09
Generated on 2024-05-20

## Release 24.04

Expand Down Expand Up @@ -29,6 +29,7 @@ Generated on 2024-05-09
### Bugs Fixed
|||
|:---|:---|
|[#10700](https://github.com/NVIDIA/spark-rapids/issues/10700)|[BUG] get_json_object cannot handle ints or boolean values|
|[#10645](https://github.com/NVIDIA/spark-rapids/issues/10645)|[BUG] java.lang.IllegalStateException: Expected to only receive a single batch|
|[#10665](https://github.com/NVIDIA/spark-rapids/issues/10665)|[BUG] Need to update private jar's version to v24.04.1 for spark-rapids v24.04.0 release|
|[#10589](https://github.com/NVIDIA/spark-rapids/issues/10589)|[BUG] ZSTD version mismatch in integration tests|
Expand Down Expand Up @@ -85,6 +86,7 @@ Generated on 2024-05-09
|||
|:---|:---|
|[#10782](https://github.com/NVIDIA/spark-rapids/pull/10782)|Update latest changelog [skip ci]|
|[#10780](https://github.com/NVIDIA/spark-rapids/pull/10780)|[DOC]Update download page for v24.04.1 [skip ci]|
|[#10777](https://github.com/NVIDIA/spark-rapids/pull/10777)|Update rapids JNI dependency: private to 24.04.2|
|[#10683](https://github.com/NVIDIA/spark-rapids/pull/10683)|Update latest changelog [skip ci]|
|[#10681](https://github.com/NVIDIA/spark-rapids/pull/10681)|Update rapids JNI dependency to 24.04.0, private to 24.04.1|
Expand Down
46 changes: 46 additions & 0 deletions docs/archive.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,52 @@ nav_order: 15
---
Below are archived releases for RAPIDS Accelerator for Apache Spark.

## Release v24.04.0
### Hardware Requirements:

The plugin is tested on the following architectures:
@@ -67,14 +67,14 @@ for your hardware's minimum driver version.
### RAPIDS Accelerator's Support Policy for Apache Spark
The RAPIDS Accelerator maintains support for Apache Spark versions available for download from [Apache Spark](https://spark.apache.org/downloads.html)

### Download RAPIDS Accelerator for Apache Spark v24.04.0

| Processor | Scala Version | Download Jar | Download Signature |
|-----------|---------------|--------------|--------------------|
| x86_64 | Scala 2.12 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0.jar.asc) |
| x86_64 | Scala 2.13 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0.jar.asc) |
| arm64 | Scala 2.12 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.04.0/rapids-4-spark_2.12-24.04.0-cuda11-arm64.jar.asc) |
| arm64 | Scala 2.13 | [RAPIDS Accelerator v24.04.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.04.0/rapids-4-spark_2.13-24.04.0-cuda11-arm64.jar.asc) |

This package is built against CUDA 11.8. It is tested on V100, T4, A10, A100, L4 and H100 GPUs with
CUDA 11.8 through CUDA 12.0.

### Verify signature
* Download the [PUB_KEY](https://keys.openpgp.org/[email protected]).
* Import the public key: `gpg --import PUB_KEY`
* Verify the signature for Scala 2.12 jar:
`gpg --verify rapids-4-spark_2.12-24.04.0.jar.asc rapids-4-spark_2.12-24.04.0.jar`
* Verify the signature for Scala 2.13 jar:
`gpg --verify rapids-4-spark_2.13-24.04.0.jar.asc rapids-4-spark_2.13-24.04.0.jar`

The output of signature verify:

gpg: Good signature from "NVIDIA Spark (For the signature of spark-rapids release jars) <[email protected]>"

### Release Notes
* New functionality and performance improvements for this release include:
* Performance improvements for S3 reading.
Refer to perfio.s3.enabled in [advanced_configs](./additional-functionality/advanced_configs.md) for more details.
* Performance improvements when doing a joins on unique keys.
* Enhanced decompression kernels for zstd and snappy.
* Enhanced Parquet reading performance with modular kernels.
* Added compatibility with Spark version 3.5.1.
* Deprecated support for Databricks 10.4 ML LTS.
* For updates on RAPIDS Accelerator Tools, please visit [this link](https://github.com/NVIDIA/spark-rapids-tools/releases).

For a detailed list of changes, please refer to the
[CHANGELOG](https://github.com/NVIDIA/spark-rapids/blob/main/CHANGELOG.md).

## Release v24.02.0
### Hardware Requirements:

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@
<cuda.version>cuda11</cuda.version>
<jni.classifier>${cuda.version}</jni.classifier>
<spark-rapids-jni.version>24.04.0</spark-rapids-jni.version>
<spark-rapids-private.version>24.04.2</spark-rapids-private.version>
<spark-rapids-private.version>24.04.3</spark-rapids-private.version>
<scala.binary.version>2.12</scala.binary.version>
<alluxio.client.version>2.8.0</alluxio.client.version>
<scala.recompileMode>incremental</scala.recompileMode>
Expand Down
2 changes: 1 addition & 1 deletion scala2.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@
<cuda.version>cuda11</cuda.version>
<jni.classifier>${cuda.version}</jni.classifier>
<spark-rapids-jni.version>24.04.0</spark-rapids-jni.version>
<spark-rapids-private.version>24.04.2</spark-rapids-private.version>
<spark-rapids-private.version>24.04.3</spark-rapids-private.version>
<scala.binary.version>2.13</scala.binary.version>
<alluxio.client.version>2.8.0</alluxio.client.version>
<scala.recompileMode>incremental</scala.recompileMode>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner
import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim}
import org.apache.spark.sql.rapids.execution.python.shims.GpuGroupedPythonRunnerFactory
import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -109,8 +109,6 @@ case class GpuAggregateInPandasExec(
val (mNumInputRows, mNumInputBatches, mNumOutputRows, mNumOutputBatches) = commonGpuMetrics()

lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf)
val childOutput = child.output
val resultExprs = resultExpressions

Expand Down Expand Up @@ -204,27 +202,22 @@ case class GpuAggregateInPandasExec(
}
}

val runnerFactory = GpuGroupedPythonRunnerFactory(conf, pyFuncs, argOffsets,
aggInputSchema, DataTypeUtilsShim.fromAttributes(pyOutAttributes),
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)

// Third, sends to Python to execute the aggregate and returns the result.
if (pyInputIter.hasNext) {
// Launch Python workers only when the data is not empty.
val pyRunner = new GpuArrowPythonRunner(
pyFuncs,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
argOffsets,
aggInputSchema,
sessionLocalTimeZone,
pythonRunnerConf,
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
DataTypeUtilsShim.fromAttributes(pyOutAttributes))

val pyRunner = runnerFactory.getRunner()
val pyOutputIterator = pyRunner.compute(pyInputIter, context.partitionId(), context)

val combinedAttrs = gpuGroupingExpressions.map(_.toAttribute) ++ pyOutAttributes
val resultRefs = GpuBindReferences.bindGpuReferences(resultExprs, combinedAttrs)
// Gets the combined batch for each group and projects for the output.
new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, pyRunner,
mNumOutputRows, mNumOutputBatches).map { combinedBatch =>
new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator,
pyRunner.asInstanceOf[GpuArrowOutput], mNumOutputRows,
mNumOutputBatches).map { combinedBatch =>
withResource(combinedBatch) { batch =>
GpuProjectExec.project(batch, resultRefs)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
Expand Down Expand Up @@ -123,7 +123,8 @@ case class GpuFlatMapGroupsInPandasExec(
resolveArgOffsets(child, groupingAttributes)

val runnerFactory = GpuGroupedPythonRunnerFactory(conf, chainedFunc, Array(argOffsets),
DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema)
DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)

// Start processing. Map grouped batches to ArrowPythonRunner results.
child.executeColumnar().mapPartitionsInternal { inputIter =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,7 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.execution.python.shims

import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.rapids.shims.ArrowUtilsShim
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand All @@ -49,14 +49,15 @@ case class GpuGroupedPythonRunnerFactory(
chainedFunc: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
dedupAttrs: StructType,
pythonOutputSchema: StructType) {
pythonOutputSchema: StructType,
evalType: Int) {
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf)

def getRunner(): GpuBasePythonRunner[ColumnarBatch] = {
new GpuArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand All @@ -65,4 +66,4 @@ case class GpuGroupedPythonRunnerFactory(
Int.MaxValue,
pythonOutputSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.execution.python.shims

import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.rapids.shims.ArrowUtilsShim
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand All @@ -30,7 +30,8 @@ case class GpuGroupedPythonRunnerFactory(
chainedFunc: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
dedupAttrs: StructType,
pythonOutputSchema: StructType) {
pythonOutputSchema: StructType,
evalType: Int) {
// Configs from DB runtime
val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice
val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled
Expand All @@ -41,7 +42,7 @@ case class GpuGroupedPythonRunnerFactory(
if (zeroConfEnabled && maxBytes > 0L) {
new GpuGroupUDFArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand All @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory(
} else {
new GpuArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,25 @@ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

//TODO is this needed? we already have a similar version in spark330db
case class GpuGroupedPythonRunnerFactory(
conf: org.apache.spark.sql.internal.SQLConf,
chainedFunc: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
dedupAttrs: StructType,
pythonOutputSchema: StructType) {
pythonOutputSchema: StructType,
evalType: Int) {
// Configs from DB runtime
val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice
val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled
val isArrowBatchSlicingEnabled = conf.pythonArrowBatchSlicingEnabled
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf)

def getRunner(): GpuBasePythonRunner[ColumnarBatch] = {
if (zeroConfEnabled && maxBytes > 0L) {
if (isArrowBatchSlicingEnabled || (zeroConfEnabled && maxBytes > 0L)) {
new GpuGroupUDFArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand All @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory(
} else {
new GpuArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
evalType,
argOffsets,
dedupAttrs,
sessionLocalTimeZone,
Expand Down