Skip to content

Commit

Permalink
Merge branch 'branch-24.02' into parse_url_query2
Browse files Browse the repository at this point in the history
  • Loading branch information
thirtiseven committed Jan 23, 2024
2 parents e4c8e06 + 91364a9 commit 60566e0
Show file tree
Hide file tree
Showing 39 changed files with 1,887 additions and 153 deletions.
2 changes: 1 addition & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.FromUnixTime"></a>spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
<a name="sql.expression.GetArrayItem"></a>spark.rapids.sql.expression.GetArrayItem| |Gets the field at `ordinal` in the Array|true|None|
<a name="sql.expression.GetArrayStructFields"></a>spark.rapids.sql.expression.GetArrayStructFields| |Extracts the `ordinal`-th fields of all array elements for the data with the type of array of struct|true|None|
<a name="sql.expression.GetJsonObject"></a>spark.rapids.sql.expression.GetJsonObject|`get_json_object`|Extracts a json object from path|true|None|
<a name="sql.expression.GetJsonObject"></a>spark.rapids.sql.expression.GetJsonObject|`get_json_object`|Extracts a json object from path|false|This is disabled by default because escape sequences are not processed correctly, the input is not validated, and the output is not normalized the same as Spark|
<a name="sql.expression.GetMapValue"></a>spark.rapids.sql.expression.GetMapValue| |Gets Value from a Map based on a key|true|None|
<a name="sql.expression.GetStructField"></a>spark.rapids.sql.expression.GetStructField| |Gets the named field of the struct|true|None|
<a name="sql.expression.GetTimestamp"></a>spark.rapids.sql.expression.GetTimestamp| |Gets timestamps from strings using given pattern.|true|None|
Expand Down
38 changes: 38 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,44 @@ parse some variants of `NaN` and `Infinity` even when this option is disabled
([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator behavior is consistent with
Spark version 3.3.0 and later.

### get_json_object

The `GetJsonObject` operator takes a JSON formatted string and a JSON path string as input. The
code base for this is currently separate from GPU parsing of JSON for files and `FromJsonObject`.
Because of this the results can be different from each other. Because of several incompatibilities
and bugs in the GPU version of `GetJsonObject` it will be on the CPU by default. If you are
aware of the current limitations with the GPU version, you might see a significant performance
speedup if you enable it by setting `spark.rapids.sql.expression.GetJsonObject` to `true`.

The following is a list of known differences.
* [No input validation](https://github.com/NVIDIA/spark-rapids/issues/10218). If the input string
is not valid JSON Apache Spark returns a null result, but ours will still try to find a match.
* [Escapes are not properly processed for Strings](https://github.com/NVIDIA/spark-rapids/issues/10196).
When returning a result for a quoted string Apache Spark will remove the quotes and replace
any escape sequences with the proper characters. The escape sequence processing does not happen
on the GPU.
* [Invalid JSON paths could throw exceptions](https://github.com/NVIDIA/spark-rapids/issues/10212)
If a JSON path is not valid Apache Spark returns a null result, but ours may throw an exception
and fail the query.
* [Non-string output is not normalized](https://github.com/NVIDIA/spark-rapids/issues/10218)
When returning a result for things other than strings, a number of things are normalized by
Apache Spark, but are not normalized by the GPU, like removing unnecessary white space,
parsing and then serializing floating point numbers, turning single quotes to double quotes,
and removing unneeded escapes for single quotes.

The following is a list of bugs in either the GPU version or arguably in Apache Spark itself.
* https://github.com/NVIDIA/spark-rapids/issues/10219 non-matching quotes in quoted strings
* https://github.com/NVIDIA/spark-rapids/issues/10213 array index notation works without root
* https://github.com/NVIDIA/spark-rapids/issues/10214 unquoted array index notation is not
supported
* https://github.com/NVIDIA/spark-rapids/issues/10215 leading spaces can be stripped from named
keys.
* https://github.com/NVIDIA/spark-rapids/issues/10216 It appears that Spark is flattening some
output, which is different from other implementations including the GPU version.
* https://github.com/NVIDIA/spark-rapids/issues/10217 a JSON path execution bug
* https://issues.apache.org/jira/browse/SPARK-46761 Apache Spark does not allow the `?` character in
a quoted JSON path string.

## Avro

The Avro format read is a very experimental feature which is expected to have some issues, so we disable
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -6856,7 +6856,7 @@ are limited.
<td rowSpan="3">GetJsonObject</td>
<td rowSpan="3">`get_json_object`</td>
<td rowSpan="3">Extracts a json object from path</td>
<td rowSpan="3">None</td>
<td rowSpan="3">This is disabled by default because escape sequences are not processed correctly, the input is not validated, and the output is not normalized the same as Spark</td>
<td rowSpan="3">project</td>
<td>json</td>
<td> </td>
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020-2023, NVIDIA CORPORATION.
Copyright (c) 2020-2024, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -80,7 +80,9 @@
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<tarLongFileMode>posix</tarLongFileMode>
<finalName>rapids-4-spark-integration-tests_${scala.binary.version}-${project.version}-${spark.version.classifier}</finalName>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -333,10 +333,15 @@ EOF
--driver-class-path "${PYSP_TEST_spark_driver_extraClassPath}"
--conf spark.executor.extraClassPath="${PYSP_TEST_spark_driver_extraClassPath}"
)
elif [[ -n "$PYSP_TEST_spark_jars_packages" ]]; then
SPARK_SHELL_ARGS_ARR+=(--packages "${PYSP_TEST_spark_jars_packages}")
else
SPARK_SHELL_ARGS_ARR+=(--jars "${PYSP_TEST_spark_jars}")
fi

if [[ -n "$PYSP_TEST_spark_jars_repositories" ]]; then
SPARK_SHELL_ARGS_ARR+=(--repositories "${PYSP_TEST_spark_jars_repositories}")
fi
# NOTE grep is used not only for checking the output but also
# to workaround the fact that spark-shell catches all failures.
# In this test it exits not because of the failure but because it encounters
Expand Down
59 changes: 57 additions & 2 deletions integration_tests/src/main/python/aqe_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@
from pyspark.sql.functions import when, col, current_date, current_timestamp
from pyspark.sql.types import *
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
from conftest import is_not_utc
from conftest import is_databricks_runtime, is_not_utc
from data_gen import *
from marks import ignore_order, allow_non_gpu
from spark_session import with_cpu_session, is_databricks113_or_later
Expand Down Expand Up @@ -243,3 +243,58 @@ def do_it(spark):

assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf)


# this is specifically to reproduce the issue found in
# https://github.com/NVIDIA/spark-rapids/issues/10165 where it has an executor broadcast
# but the exchange going into the BroadcastHashJoin is an exchange with multiple partitions
# and goes into AQEShuffleRead that uses CoalescePartitions to go down to a single partition
db_133_cpu_bnlj_join_allow=["ShuffleExchangeExec"] if is_databricks113_or_later() else []
@ignore_order(local=True)
@pytest.mark.skipif(not (is_databricks_runtime()), \
reason="Executor side broadcast only supported on Databricks")
@allow_non_gpu('BroadcastHashJoinExec', 'ColumnarToRowExec', *db_113_cpu_bnlj_join_allow)
def test_aqe_join_executor_broadcast_not_single_partition(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
bhj_disable_conf = copy_and_update(_adaptive_conf,
{ "spark.rapids.sql.exec.BroadcastHashJoinExec": "false"})

def prep(spark):
data = [
(("Adam ", "", "Green"), "1", "M", 1000),
(("Bob ", "Middle", "Green"), "2", "M", 2000),
(("Cathy ", "", "Green"), "3", "F", 3000)
]
schema = (StructType()
.add("name", StructType()
.add("firstname", StringType())
.add("middlename", StringType())
.add("lastname", StringType()))
.add("id", StringType())
.add("gender", StringType())
.add("salary", IntegerType()))
df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
df.write.format("parquet").mode("overwrite").save(data_path)
data_school= [
("1", "school1"),
("2", "school1"),
("3", "school2")
]
schema_school = (StructType()
.add("id", StringType())
.add("school", StringType()))
df_school = spark.createDataFrame(spark.sparkContext.parallelize(data_school),schema_school)
df_school.createOrReplaceTempView("df_school")

with_cpu_session(prep)

def do_it(spark):
newdf = spark.read.parquet(data_path)
newdf.createOrReplaceTempView("df")
return spark.sql(
"""
select /*+ BROADCAST(df_school) */ * from df a left outer join df_school b on a.id == b.id
"""
)

assert_gpu_and_cpu_are_equal_collect(do_it, conf=bhj_disable_conf)

6 changes: 2 additions & 4 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ def test_unsupported_fallback_to_unix_timestamp(data_gen):
unsupported_timezones = ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"]

@pytest.mark.parametrize('time_zone', supported_timezones, ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_from_utc_timestamp(time_zone):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, timestamp_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)))
Expand All @@ -311,7 +310,6 @@ def test_unsupported_fallback_from_utc_timestamp():
"from_utc_timestamp(a, tzone)"),
'FromUTCTimestamp')

@allow_non_gpu(*non_utc_allow)
@pytest.mark.parametrize('time_zone', supported_timezones, ids=idfn)
def test_to_utc_timestamp(time_zone):
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -413,7 +411,7 @@ def invalid_date_string_df(spark):

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
@allow_non_gpu(*non_utc_tz_allow)
@allow_non_gpu(*non_supported_tz_allow)
def test_string_to_unix_timestamp(data_gen, date_form, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)),
Expand All @@ -427,7 +425,7 @@ def test_string_to_unix_timestamp_ansi_exception():

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn)
@allow_non_gpu(*non_utc_tz_allow)
@allow_non_gpu(*non_supported_tz_allow)
def test_string_unix_timestamp(data_gen, date_form, ansi_enabled):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)),
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/dpp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def fn(spark):
df.write.format(table_format) \
.mode("overwrite") \
.saveAsTable(table_name)
return df.select('filter').first()[0]
return df.select('filter').where("value > 0").first()[0]

return with_cpu_session(fn)

Expand Down
Loading

0 comments on commit 60566e0

Please sign in to comment.