Skip to content

Commit

Permalink
Merge pull request #141 from NvTimLiu/merge-branch-24.12-to-main
Browse files Browse the repository at this point in the history
Merge branch-24.12 into main
  • Loading branch information
nvauto authored Dec 8, 2024
2 parents 3e24d01 + fa7a866 commit de7ca19
Show file tree
Hide file tree
Showing 632 changed files with 20,234 additions and 5,276 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/blossom-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ jobs:
github.actor == 'Feng-Jiang28' ||
github.actor == 'SurajAralihalli' ||
github.actor == 'jihoonson' ||
github.actor == 'ustcfy'
github.actor == 'ustcfy' ||
github.actor == 'knoguchi22'
)
steps:
- name: Check if comment is issued by authorized person
Expand Down
8 changes: 2 additions & 6 deletions .github/workflows/mvn-verify-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,10 @@ jobs:
echo "Generated Scala 2.13 build files don't match what's in repository"
exit 1
fi
# change to Scala 2.13 Directory
cd scala2.13
# test command, will retry for 3 times if failed.
max_retry=3; delay=30; i=1
while true; do
mvn package \
mvn package -f scala2.13/ \
-pl integration_tests,tests,tools -am -P 'individual,pre-merge' \
-Dbuildver=${{ matrix.spark-version }} -Dmaven.scalastyle.skip=true \
-Drat.skip=true ${{ env.COMMON_MVN_FLAGS }} && break || {
Expand Down Expand Up @@ -303,12 +301,10 @@ jobs:
echo "Generated Scala 2.13 build files don't match what's in repository"
exit 1
fi
# change to Scala 2.13 Directory
cd scala2.13
# test command, will retry for 3 times if failed.
max_retry=3; delay=30; i=1
while true; do
mvn verify \
mvn verify -f scala2.13/ \
-P "individual,pre-merge,source-javadoc" -Dbuildver=${{ matrix.spark-version }} \
${{ env.COMMON_MVN_FLAGS }} && break || {
if [[ $i -le $max_retry ]]; then
Expand Down
374 changes: 164 additions & 210 deletions CHANGELOG.md

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ mvn -pl dist -PnoSnapshots package -DskipTests
Verify that shim-specific classes are hidden from a conventional classloader.

```bash
$ javap -cp dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl
$ javap -cp dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl
Error: class not found: com.nvidia.spark.rapids.shims.SparkShimImpl
```

However, its bytecode can be loaded if prefixed with `spark3XY` not contained in the package name

```bash
$ javap -cp dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar spark320.com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
Warning: File dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar(/spark320/com/nvidia/spark/rapids/shims/SparkShimImpl.class) does not contain class spark320.com.nvidia.spark.rapids.shims.SparkShimImpl
$ javap -cp dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar spark320.com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
Warning: File dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar(/spark320/com/nvidia/spark/rapids/shims/SparkShimImpl.class) does not contain class spark320.com.nvidia.spark.rapids.shims.SparkShimImpl
Compiled from "SparkShims.scala"
public final class com.nvidia.spark.rapids.shims.SparkShimImpl {
```
Expand Down Expand Up @@ -178,7 +178,7 @@ mvn package -pl dist -am -Dbuildver=340 -DallowConventionalDistJar=true
Verify `com.nvidia.spark.rapids.shims.SparkShimImpl` is conventionally loadable:
```bash
$ javap -cp dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
$ javap -cp dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
Compiled from "SparkShims.scala"
public final class com.nvidia.spark.rapids.shims.SparkShimImpl {
```
Expand Down
117 changes: 117 additions & 0 deletions DF_UDF_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Scala / Java UDFS implemented using data frame

User Defined Functions (UDFs) are used for a number of reasons in Apache Spark. Much of the time it is to implement
logic that is either very difficult or impossible to implement using existing SQL/Dataframe APIs directly. But they
are also used as a way to standardize processing logic across an organization or for code reused.

But UDFs come with some downsides. The biggest one is visibility into the processing being done. SQL is a language that
can be highly optimized. But a UDF in most cases is a black box, that the SQL optimizer cannot do anything about.
This can result in less than ideal query planning. Additionally, accelerated execution environments, like the
RAPIDS Accelerator for Apache Spark have no easy way to replace UDFs with accelerated versions, which can result in
slow performance.

This attempts to add visibility to the code reuse use case by providing a way to implement a UDF in terms of dataframe
commands.

## Setup

The dataframe UDF plugin is packaged in the same jar as the RAPIDS Accelerator for Apache Spark. This jar will need to
be added as a compile time dependency for code that wants to use this feature as well as adding the jar to your Spark
classpath just like you would do for GPU acceleration.

If you plan to not use the GPU accelerated processing, but still want dataframe UDF support on CPU applications then
add `com.nvidia.spark.DFUDFPlugin` to the `spark.sql.extensions` config. If you do use GPU accelerated processing
the RAPIDS Plugin will enable this automatically. You don't need to set the `spark.sql.extensions` config, but it
won't hurt anything if you do add it. Now you can implement a UDF in terms of Dataframe operations.

## Usage

```scala
import com.nvidia.spark.functions._

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val sum_array = df_udf((longArray: Column) =>
aggregate(longArray,
lit(0L),
(a, b) => coalesce(a, lit(0L)) + coalesce(b, lit(0L)),
a => a))
spark.udf.register("sum_array", sum_array)
```

You can then use `sum_array` however you would have used any other UDF. This allows you to provide a drop in replacement
implementation of an existing UDF.

```scala
Seq(Array(1L, 2L, 3L)).toDF("data").selectExpr("sum_array(data) as result").show()

+------+
|result|
+------+
| 6|
+------+
```

Java APIs are also supported and should work the same as Spark's UDFs

```java
import com.nvidia.spark.functions.df_udf

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedFunction;


UserDefinedFunction myAdd = df_udf((Column lhs, Column rhs) -> lhs + rhs)
spark.udf().register("myadd", myAdd)

spark.sql("SELECT myadd(1, 1) as r").show();
// +--+
// | r|
// +--+
// | 2|
// +--+

```

## Type Checks

DataFrame APIs do not provide type safety when writing the code and that is the same here. There are no builtin type
checks for inputs yet. Also, because of how types are resolved in Spark there is no way to adjust the query based on
the types passed in. Type checks are handled by the SQL planner/optimizer after the UDF has been replaced. This means
that the final SQL will not violate any type safety, but it also means that the errors might be confusing. For example,
if I passed in an `ARRAY<DOUBLE>` to `sum_array` instead of an `ARRAY<LONG>` I would get an error like

```scala
Seq(Array(1.0, 2.0, 3.0)).toDF("data").selectExpr("sum_array(data) as result").show()
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "aggregate(data, 0, lambdafunction((coalesce(namedlambdavariable(), 0) + coalesce(namedlambdavariable(), 0)), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))" due to data type mismatch: Parameter 3 requires the "BIGINT" type, however "lambdafunction((coalesce(namedlambdavariable(), 0) + coalesce(namedlambdavariable(), 0)), namedlambdavariable(), namedlambdavariable())" has the type "DOUBLE".; line 1 pos 0;
Project [aggregate(data#46, 0, lambdafunction((cast(coalesce(lambda x_9#49L, 0) as double) + coalesce(lambda y_10#50, cast(0 as double))), lambda x_9#49L, lambda y_10#50, false), lambdafunction(lambda x_11#51L, lambda x_11#51L, false)) AS result#48L]
+- Project [value#43 AS data#46]
+- LocalRelation [value#43]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:269)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
```

Which is not as simple to understand as a normal UDF.

```scala
val sum_array = udf((a: Array[Long]) => a.sum)

spark.udf.register("sum_array", sum_array)

Seq(Array(1.0, 2.0, 3.0)).toDF("data").selectExpr("sum_array(data) as result").show()
org.apache.spark.sql.AnalysisException: [CANNOT_UP_CAST_DATATYPE] Cannot up cast array element from "DOUBLE" to "BIGINT".
The type path of the target object is:
- array element class: "long"
- root class: "[J"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object
at org.apache.spark.sql.errors.QueryCompilationErrors$.upCastFailureError(QueryCompilationErrors.scala:285)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3646)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$57$$anonfun$applyOrElse$234.applyOrElse(Analyzer.scala:3677)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$57$$anonfun$applyOrElse$234.applyOrElse(Analyzer.scala:3654)
```

We hope to add optional type checks in the future.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ as a `provided` dependency.
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark_2.12</artifactId>
<version>24.10.1</version>
<version>24.12.0</version>
<scope>provided</scope>
</dependency>
```
Loading

0 comments on commit de7ca19

Please sign in to comment.