Skip to content

Commit

Permalink
Merge remote-tracking branch 'NVDA/branch-24.12' into shuffle-coalesc…
Browse files Browse the repository at this point in the history
…e-split-retry

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Nov 5, 2024
2 parents 4ac3373 + 5afee5b commit 369d5c6
Show file tree
Hide file tree
Showing 305 changed files with 4,087 additions and 1,777 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/mvn-verify-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,10 @@ jobs:
echo "Generated Scala 2.13 build files don't match what's in repository"
exit 1
fi
# change to Scala 2.13 Directory
cd scala2.13
# test command, will retry for 3 times if failed.
max_retry=3; delay=30; i=1
while true; do
mvn package \
mvn package -f scala2.13/ \
-pl integration_tests,tests,tools -am -P 'individual,pre-merge' \
-Dbuildver=${{ matrix.spark-version }} -Dmaven.scalastyle.skip=true \
-Drat.skip=true ${{ env.COMMON_MVN_FLAGS }} && break || {
Expand Down Expand Up @@ -303,12 +301,10 @@ jobs:
echo "Generated Scala 2.13 build files don't match what's in repository"
exit 1
fi
# change to Scala 2.13 Directory
cd scala2.13
# test command, will retry for 3 times if failed.
max_retry=3; delay=30; i=1
while true; do
mvn verify \
mvn verify -f scala2.13/ \
-P "individual,pre-merge,source-javadoc" -Dbuildver=${{ matrix.spark-version }} \
${{ env.COMMON_MVN_FLAGS }} && break || {
if [[ $i -le $max_retry ]]; then
Expand Down
286 changes: 163 additions & 123 deletions CHANGELOG.md

Large diffs are not rendered by default.

35 changes: 31 additions & 4 deletions df_udf/README.md → DF_UDF_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ commands.

## Setup

To do this include com.nvidia:df_udf_plugin as a dependency for your project and also include it on the
classpath for your Apache Spark environment. Then include `com.nvidia.spark.DFUDFPlugin` in the config
`spark.sql.extensions`. Now you can implement a UDF in terms of Dataframe operations.
The dataframe UDF plugin is packaged in the same jar as the RAPIDS Accelerator for Apache Spark. This jar will need to
be added as a compile time dependency for code that wants to use this feature as well as adding the jar to your Spark
classpath just like you would do for GPU acceleration.

If you plan to not use the GPU accelerated processing, but still want dataframe UDF support on CPU applications then
add `com.nvidia.spark.DFUDFPlugin` to the `spark.sql.extensions` config. If you do use GPU accelerated processing
the RAPIDS Plugin will enable this automatically. You don't need to set the `spark.sql.extensions` config, but it
won't hurt anything if you do add it. Now you can implement a UDF in terms of Dataframe operations.

## Usage

Expand Down Expand Up @@ -48,6 +53,28 @@ Seq(Array(1L, 2L, 3L)).toDF("data").selectExpr("sum_array(data) as result").show
+------+
```

Java APIs are also supported and should work the same as Spark's UDFs

```java
import com.nvidia.spark.functions.df_udf

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedFunction;


UserDefinedFunction myAdd = df_udf((Column lhs, Column rhs) -> lhs + rhs)
spark.udf().register("myadd", myAdd)

spark.sql("SELECT myadd(1, 1) as r").show();
// +--+
// | r|
// +--+
// | 2|
// +--+

```

## Type Checks

DataFrame APIs do not provide type safety when writing the code and that is the same here. There are no builtin type
Expand Down Expand Up @@ -87,4 +114,4 @@ at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$57$$anonfun$applyOrElse$234.applyOrElse(Analyzer.scala:3654)
```

We hope to add optional type checks in the future.
We hope to add optional type checks in the future.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ or answer a question on the [discussion board](https://github.com/NVIDIA/spark-r

## Download

The jar files for the most recent release can be retrieved from the [download](docs/download.md)
The jar files for the most recent release can be retrieved from the [download](https://nvidia.github.io/spark-rapids/docs/download.html)
page.

## Building From Source
Expand Down
17 changes: 17 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,23 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>release353</id>
<activation>
<property>
<name>buildver</name>
<value>353</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-delta-stub_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>${spark.version.classifier}</classifier>
</dependency>
</dependencies>
</profile>
<!-- #if scala-2.13 --><!--
<profile>
<id>release400</id>
Expand Down
7 changes: 2 additions & 5 deletions build/buildall
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ function bloopInstall() {

function versionsFromDistProfile() {
[[ "$BUILD_ALL_DEBUG" == "1" ]] && set -x
versionRawStr=$(mvn -B help:evaluate -q -pl dist -P"$1" -Dexpression=included_buildvers -DforceStdout)
versionRawStr=$($MVN -B help:evaluate -q -pl dist -P"$1" -Dexpression=included_buildvers -DforceStdout)
versionStr=${versionRawStr//[$'\n',]/}
echo -n $versionStr
}
Expand Down Expand Up @@ -171,6 +171,7 @@ fi
export MVN="mvn -Dmaven.wagon.http.retryHandler.count=3 ${MVN_OPT}"

if [[ "$SCALA213" == "1" ]]; then
MVN="$MVN -f scala2.13/"
DIST_PROFILE=${DIST_PROFILE:-"noSnapshotsScala213"}
$(dirname $0)/make-scala-version-build-files.sh 2.13
else
Expand Down Expand Up @@ -234,10 +235,6 @@ if [[ "$SKIP_CLEAN" != "1" ]]; then
$MVN -q clean
fi

if [[ "$SCALA213" == "1" ]]; then
cd scala2.13
fi

echo "Building a combined dist jar with Shims for ${SPARK_SHIM_VERSIONS[@]} ..."

function build_single_shim() {
Expand Down
44 changes: 33 additions & 11 deletions build/shimplify.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import os
import re
import subprocess
from functools import partial


def __project():
Expand Down Expand Up @@ -199,7 +200,9 @@ def __csv_as_arr(str_val):
__shim_comment_pattern = re.compile(re.escape(__opening_shim_tag) +
r'\n(.*)\n' +
re.escape(__closing_shim_tag), re.DOTALL)

__spark_version_classifier = '$_spark.version.classifier_'
__spark_version_placeholder = re.escape(__spark_version_classifier)
__package_pattern = re.compile('package .*' + '(' + __spark_version_placeholder + ')')
def __upsert_shim_json(filename, bv_list):
with open(filename, 'r') as file:
contents = file.readlines()
Expand Down Expand Up @@ -365,10 +368,7 @@ def __generate_symlinks():
__log.info("# generating symlinks for shim %s %s files", buildver, src_type)
__traverse_source_tree_of_all_shims(
src_type,
lambda src_type, path, build_ver_arr: __generate_symlink_to_file(buildver,
src_type,
path,
build_ver_arr))
partial(__generate_symlink_to_file, buildver=buildver, src_type=src_type))

def __traverse_source_tree_of_all_shims(src_type, func):
"""Walks src/<src_type>/sparkXYZ"""
Expand All @@ -392,11 +392,10 @@ def __traverse_source_tree_of_all_shims(src_type, func):
build_ver_arr = map(lambda x: str(json.loads(x).get('spark')), shim_arr)
__log.debug("extracted shims %s", build_ver_arr)
assert build_ver_arr == sorted(build_ver_arr),\
"%s shim list is not properly sorted" % shim_file_path
func(src_type, shim_file_path, build_ver_arr)

"%s shim list is not properly sorted: %s" % (shim_file_path, build_ver_arr)
func(shim_file_path=shim_file_path, build_ver_arr=build_ver_arr, shim_file_txt=shim_file_txt)

def __generate_symlink_to_file(buildver, src_type, shim_file_path, build_ver_arr):
def __generate_symlink_to_file(buildver, src_type, shim_file_path, build_ver_arr, shim_file_txt):
if buildver in build_ver_arr:
project_base_dir = str(__project().getBaseDir())
base_dir = __src_basedir
Expand All @@ -416,9 +415,32 @@ def __generate_symlink_to_file(buildver, src_type, shim_file_path, build_ver_arr
target_shim_file_path = os.path.join(target_root, target_rel_path)
__log.debug("creating symlink %s -> %s", target_shim_file_path, shim_file_path)
__makedirs(os.path.dirname(target_shim_file_path))
if __should_overwrite:
package_match = __package_pattern.search(shim_file_txt)
if __should_overwrite or package_match:
__remove_file(target_shim_file_path)
__symlink(shim_file_path, target_shim_file_path)
if package_match:
with open(target_shim_file_path, mode='w') as f:
f.write(shim_file_txt[0:package_match.start(1)])
f.write("spark")
f.write(buildver)
f.write('\n')
f.write('''
/*
!!! DO NOT EDIT THIS FILE !!!
This file has been generated from the original
%s
by interpolating $_spark.version.classifier_=%s
Be sure to edit the original file if required
*/
''' % (shim_file_path, 'spark' + buildver))
f.write(shim_file_txt[package_match.end(1):])
else:
__symlink(shim_file_path, target_shim_file_path)


def __symlink(src, target):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
{"spark": "353"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.tests.datagen

Expand Down
88 changes: 0 additions & 88 deletions df_udf/pom.xml

This file was deleted.

1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Name | Description | Default Value | Applicable at
<a name="sql.join.leftOuter.enabled"></a>spark.rapids.sql.join.leftOuter.enabled|When set to true left outer joins are enabled on the GPU|true|Runtime
<a name="sql.join.leftSemi.enabled"></a>spark.rapids.sql.join.leftSemi.enabled|When set to true left semi joins are enabled on the GPU|true|Runtime
<a name="sql.join.rightOuter.enabled"></a>spark.rapids.sql.join.rightOuter.enabled|When set to true right outer joins are enabled on the GPU|true|Runtime
<a name="sql.json.read.datetime.enabled"></a>spark.rapids.sql.json.read.datetime.enabled|JSON reading is not 100% compatible when reading dates and timestamps.|false|Runtime
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|When reading a quoted string as a decimal Spark supports reading non-ascii unicode digits, and the RAPIDS Accelerator does not.|true|Runtime
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime
Expand Down
Loading

0 comments on commit 369d5c6

Please sign in to comment.