Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-merge] branch-23.10 to branch-23.12 [skip ci] [bot] #324

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 81 additions & 28 deletions examples/UDF-Examples/RAPIDS-accelerated-UDFs/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
# RAPIDS Accelerated UDF Examples

This project contains sample implementations of RAPIDS accelerated user-defined functions.

The ideal solution would be to replace the UDF with a series of DataFrame or SQL operations. If that
is not possible, we also provide
a [UDF compiler extension](https://nvidia.github.io/spark-rapids/docs/additional-functionality/udf-to-catalyst-expressions.html)
to translate UDFs to Catalyst expressions. The extension is limited to only support compiling simple
operations. For complicated cases, you can choose to implement a RAPIDS accelerated UDF.

## Spark Scala UDF Examples

[URLDecode](src/main/scala/com/nvidia/spark/rapids/udf/scala/URLDecode.scala)
is the simplest demo for getting started. From the code you can see there is an original CPU
implementation provided by the `apply` method. We only need to implement the RapidsUDF interface
which provides a single method we need to override called
`evaluateColumnar`. The CPU URLDecode function processes the input row by row, but the GPU
evaluateColumnar returns a cudf ColumnVector, because the GPU get its speed by performing operations
on many rows at a time. In the `evaluateColumnar` function, there is a cudf implementation of URL
decode that we're leveraging, so we don't need to write any native C++ code. This is all done
through the [Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable). The benefit to
implement via the Java API is ease of development, but the memory model is not friendly for doing
GPU operations because the JVM makes the assumption that everything we're trying to do is in heap
memory. We need to free the GPU resources in a timely manner with try-finally blocks. Note that we
need to implement both CPU and GPU functions so the UDF will still work if a higher-level operation
involving the RAPIDS accelerated UDF falls back to the CPU.

- [URLDecode](src/main/scala/com/nvidia/spark/rapids/udf/scala/URLDecode.scala)
decodes URL-encoded strings using the
[Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable)
Expand All @@ -12,6 +34,23 @@ This project contains sample implementations of RAPIDS accelerated user-defined

## Spark Java UDF Examples

Below are some examples for implementing RAPIDS accelerated Scala UDF via JNI and native code. If
there is no existing simple Java API we could leverage, we can write native custom code.
Take [CosineSimilarity](src/main/java/com/nvidia/spark/rapids/udf/java/CosineSimilarity.java) as the
example, the Java class for the UDF is similar as the previous URLDecode/URLEncode demo. We need to
implement a cosineSimilarity function in C++ code and goes into the native code as quickly as
possible, because it is easier to write the code safely. In the native code, it `reinterpret_cast`
the input to a column view, do some sanity checking and convert to list column views, then compute
the cosine similarity, finally return the unique pointer to a column, release the underlying
resources. On Java side we are going to wrap it in a column vector and own that resource.
In `cosine_similarity.cu` we implement the computation as the actual CUDA kernel. In the CUDA kernel
we can leverage the [Thrust template library](https://docs.nvidia.com/cuda/thrust/index.html) to
write the standard algorithms for GPU parallelizing code. The benefit of implementing the UDF in
native code is for maximum control over GPU memory utilization and performance. However the
trade-off is a more complicated build environment, as we need to build against libcudf with
significantly longer build times. Implementing a RAPIDS accelerated UDF in native code is a
significant effort.

- [URLDecode](src/main/java/com/nvidia/spark/rapids/udf/java/URLDecode.java)
decodes URL-encoded strings using the
[Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable)
Expand All @@ -24,6 +63,8 @@ This project contains sample implementations of RAPIDS accelerated user-defined

## Hive UDF Examples

Below are some examples for implementing RAPIDS accelerated Hive UDF via JNI and native code.

- [URLDecode](src/main/java/com/nvidia/spark/rapids/udf/hive/URLDecode.java)
implements a Hive simple UDF using the
[Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable)
Expand All @@ -37,77 +78,91 @@ This project contains sample implementations of RAPIDS accelerated user-defined
[native code](src/main/cpp/src) to count words in strings

## Building and run the tests without Native Code Examples
Some UDF examples use native code in their implementation.
Building the native code requires a libcudf build environment, so these
examples do not build by default.

Some UDF examples use native code in their implementation. Building the native code requires a
libcudf build environment, so these examples do not build by default.

### Prerequisites
Download Spark and set SPARK_HOME environment variable.
Refer to [Prerequisites](../../../docs/get-started/xgboost-examples/on-prem-cluster/standalone-python.md#Prerequisites)
Install python 3.8+, then install pytest, pyspark, sre_yield, findspark by using pip or conda.
For example:

Download [Apache Spark](https://spark.apache.org/downloads.html) and set `SPARK_HOME` environment variable.
Install Python 3.8+, then install `pytest`, `sre_yield` by using pip or conda. For
example:

```
export SPARK_HOME=path-to-spark
pip install pytest
pip install pyspark
pip install sre_yield
pip install findspark
```

Run the following command to build and run tests

```bash
cd spark-rapids-examples/examples/UDF-Examples/RAPIDS-accelerated-UDFs
mvn clean package
./run_pyspark_from_build.sh -m "not rapids_udf_example_native"
```

## Building with Native Code Examples and run test cases
The `udf-native-examples` Maven profile
can be used to include the native UDF examples in the build, i.e.: specify
`-Pudf-native-examples` on the `mvn` command-line.

The `udf-native-examples` Maven profile can be used to include the native UDF examples in the build,
i.e.: specify
`-Pudf-native-examples` on the `mvn` command-line.

### Creating a libcudf Build Environment

Building the native code requires a libcudf build environment.
The `Dockerfile` in this directory can be used to setup a Docker image that
provides a libcudf build environment. This repository will either need to be
cloned or mounted into a container using that Docker image.
The `Dockerfile` contains build arguments to control the Linux version,
CUDA version, and other settings. See the top of the `Dockerfile` for details.
The `Dockerfile` in this directory can be used to setup a Docker image that provides a libcudf build
environment. This repository will either need to be cloned or mounted into a container using that
Docker image. The `Dockerfile` contains build arguments to control the Linux version, CUDA version,
and other settings. See the top of the `Dockerfile` for details.

First install docker and [nvidia-docker](https://github.com/NVIDIA/nvidia-docker)

Run the following commands to build and start a docker

```bash
cd spark-rapids-examples/examples/UDF-Examples/RAPIDS-accelerated-UDFs
docker build -t my-local:my-udf-example-ubuntu .
nvidia-docker run -it my-local:my-udf-example-ubuntu
```

### Build the udf-examples jar
In the docker, clone the code and compile.

In the Docker container, clone the code and compile.

```bash
git clone https://github.com/NVIDIA/spark-rapids-examples.git
cd spark-rapids-examples/examples/UDF-Examples/RAPIDS-accelerated-UDFs
mvn clean package -Pudf-native-examples
```
The building will spend some time like 1.5 hours.
Then the rapids-4-spark-udf-examples*.jar is generated under RAPIDS-accelerated-UDFs/target directory.

The build could take a long time (e.g.: 1.5 hours). Then the rapids-4-spark-udf-examples*.jar is
generated under RAPIDS-accelerated-UDFs/target directory.

### Run all the examples including native examples in the docker
Download Spark and set SPARK_HOME environment variable.
Refer to [Prerequisites](../../../docs/get-started/xgboost-examples/on-prem-cluster/standalone-python.md#Prerequisites)
Set SPARK_HOME environment variable.

See the above [Prerequisites section](#prerequisites)

```
export SPARK_HOME=path-to-spark
pip install pytest
pip install sre_yield
```
Install python 3.8+, then install pytest, pyspark, sre_yield, findspark by using pip or conda.
See above Prerequisites section

Run the following command to run tests

```
./run_pyspark_from_build.sh
```

## How to run the Native UDFs on Spark local mode
First finish the steps in "Building with Native Code Examples and run test cases" section, then do the following in the docker.

First finish the steps in
[Building with Native Code Examples and run test cases](#building-with-native-code-examples-and-run-test-cases) section,
then do the following inside the Docker container.

### Get jars from Maven Central

[rapids-4-spark_2.12-23.08.1.jar](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/23.08.1/rapids-4-spark_2.12-23.08.1.jar)

### Launch a local mode Spark
Expand Down Expand Up @@ -151,5 +206,3 @@ spark.sql("CREATE TEMPORARY FUNCTION {} AS '{}'".format("wordcount", "com.nvidia
spark.sql("select wordcount(c1) from tab group by c1").show()
spark.sql("select wordcount(c1) from tab group by c1").explain()
```

Refer to [more Spark modes](../../../docs/get-started/xgboost-examples/on-prem-cluster) to test against more Spark modes.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ else
"$SCRIPTPATH"
"$SCRIPTPATH"/src/main/python)

# --ignore=target is used to exclude the target directory whihch contains unrelated python files.
# --ignore=target is used to exclude the target directory which contains unrelated python files.
TEST_COMMON_OPTS=(-v
-rfExXs
"$TEST_ARGS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Java_com_nvidia_spark_rapids_udf_java_CosineSimilarity_cosineSimilarity(JNIEnv*
auto lv2 = cudf::lists_column_view(*v2);
std::unique_ptr<cudf::column> result = cosine_similarity(lv1, lv2);

// take ownership of the column and return the column address to Java
// take ownership of the column and return the column address to Java and release the underlying resources.
return reinterpret_cast<jlong>(result.release());
} catch (std::bad_alloc const& e) {
auto msg = std::string("Unable to allocate native memory: ") +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ public ColumnVector evaluateColumnar(int numRows, ColumnVector... args) {

// Load the native code if it has not been already loaded. This is done here
// rather than in a static code block since the driver may not have the
// required CUDA environment.
// required CUDA environment.
NativeUDFExamplesLoader.ensureLoaded();


// We need to go into the native code as quickly as possible
// because it is easier to write the code safely.
// Then wrap returns in a column vector and own that resource.
return new ColumnVector(cosineSimilarity(args[0].getNativeView(), args[1].getNativeView()));
}

Expand Down
Loading