Skip to content

Commit

Permalink
Merge branch 'branch-23.12' into refactor_parquet_scan
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala
#	sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
  • Loading branch information
ttnghia committed Nov 6, 2023
2 parents edb6c81 + cec6977 commit 54e959f
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 107 deletions.
32 changes: 16 additions & 16 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,25 +315,25 @@ In the current version, nested types (array, struct, and map types) are not yet

This particular function supports to output a map or struct type with limited functionalities.

For struct output type, the function only supports struct of struct, array, string and int types. The output is incompatible if duplicated json key names are present in the input strings. For schemas that include IntegerType,
if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will cast the numbers to
IntegerType, whereas CPU Spark will return null.
The `from_json` function is disabled by default because it is experimental and has some known incompatibilities
with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStructs=true`.

There are several known issues:

Dates and timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)).

When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting
for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)).

For struct output type, the function only supports struct of struct, array, string, integral, floating-point, and
decimal types. The output is incompatible if duplicated json key names are present in the input strings. For schemas
that include IntegerType, if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will
cast the numbers to IntegerType, whereas CPU Spark will return null.

In particular, the output map is not resulted from a regular JSON parsing but instead it will just contain plain text of key-value pairs extracted directly from the input JSON string. Due to such limitations, the input JSON map type schema must be `MAP<STRING,STRING>` and nothing else. Furthermore, there is no validation, no error tolerance, no data conversion as well as string formatting is performed. This may lead to some minor differences in the output if compared to the result of Spark CPU's `from_json`, such as:
* Floating point numbers in the input JSON string such as `1.2000` will not be reformatted to `1.2`. Instead, the output will be the same as the input.
* If the input JSON is given as multiple rows, any row containing invalid JSON format will lead to an application crash. On the other hand, Spark CPU version just produces nulls for the invalid rows, as shown below:
```
scala> val df = Seq("{}", "BAD", "{\"A\": 100}").toDF
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.selectExpr("from_json(value, 'MAP<STRING,STRING>')").show()
+----------+
| entries|
+----------+
| {}|
| null|
|{A -> 100}|
+----------+
```
* If the input JSON is given as multiple rows, any row containing invalid JSON format will be parsed as an empty
struct instead of a null value ([#9592](https://github.com/NVIDIA/spark-rapids/issues/9592)).

### `to_json` function

Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8141,8 +8141,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types BOOLEAN, DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types BOOLEAN, DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
</tr>
<tr>
Expand Down
59 changes: 46 additions & 13 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,46 +472,79 @@ def test_from_json_map_fallback():
'JsonToStructs',
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.parametrize('schema', ['struct<a:string>',
'struct<d:string>',
'struct<a:string,b:string>',
'struct<c:int,a:string>',
'struct<a:string,a:string>',
])
@pytest.mark.parametrize('schema', [
'struct<a:string>',
'struct<b:string>',
'struct<c:string>',
'struct<a:int>',
'struct<a:long>',
'struct<a:float>',
'struct<a:double>',
'struct<a:decimal>',
'struct<d:string>',
'struct<a:string,b:string>',
'struct<c:int,a:string>',
'struct<a:string,a:string>',
])
def test_from_json_struct(schema):
json_string_gen = StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1\d\d\d}').with_special_pattern('', weight=50)
# note that column 'a' does not use leading zeroes due to https://github.com/NVIDIA/spark-rapids/issues/9588
json_string_gen = StringGen(r'{"a": [1-9]{0,5}, "b": "[A-Z]{0,5}", "c": 1\d\d\d}') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9597')
def test_from_json_struct_boolean():
json_string_gen = StringGen(r'{ "a": [truefalsTRUEFALS]{0,5}, "b": [0-9]{0,2} }') \
.with_special_pattern('{ "a": true, "b": 1 }', weight=50) \
.with_special_pattern('{ "a": false, "b": 0 }', weight=50) \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', 'struct<a:boolean, b:boolean>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

def test_from_json_struct_decimal():
json_string_gen = StringGen(r'{ "a": "[+-]?([0-9]{0,5})?(\.[0-9]{0,2})?([eE][+-]?[0-9]{1,2})?" }') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', 'struct<a:decimal>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:struct<name:string,age:int>>',
'struct<teacher:string,student:struct<name:string,age:int>>'])
def test_from_json_struct_of_struct(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}').with_special_pattern('', weight=50)
r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50) \
.with_special_pattern('invalid_entry', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:array<struct<name:string,class:string>>>',
'struct<teacher:string,student:array<struct<name:string,class:string>>>'])
def test_from_json_struct_of_list(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": \[{"name": "[A-Z]{1}[a-z]{2,5}", "class": "junior"},' \
r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}').with_special_pattern('', weight=50)
r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.parametrize('schema', ['struct<a:string>', 'struct<a:string,b:int>'])
def test_from_json_struct_all_empty_string_input(schema):
json_string_gen = StringGen('')
Expand Down
41 changes: 31 additions & 10 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,35 @@ def test_parquet_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)),
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),

# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed delete this test and merge it
# into test_ts_read_round_trip nested timestamps and dates are not supported right now.
@pytest.mark.parametrize('gen', [ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1126')
def test_parquet_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.legacy.parquet.int96RebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_write})
all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_ts_read_round_trip(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
Expand All @@ -337,10 +358,10 @@ def readParquetCatchException(spark, data_path):
df = spark.read.parquet(data_path).collect()
assert e_info.match(r".*SparkUpgradeException.*")

# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc)),
ArrayGen(TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))),
ArrayGen(ArrayGen(TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))))], ids=idfn)
# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed nested timestamps and dates should be added in
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
Expand Down Expand Up @@ -982,7 +1003,7 @@ def test_parquet_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp


conf_for_parquet_aggregate_pushdown = {
"spark.sql.parquet.aggregatePushdown": "true",
"spark.sql.parquet.aggregatePushdown": "true",
"spark.sql.sources.useV1SourceList": ""
}

Expand Down Expand Up @@ -1469,15 +1490,15 @@ def test_parquet_read_count(spark_tmp_path):
def test_read_case_col_name(spark_tmp_path, read_func, v1_enabled_list, reader_confs, col_name):
all_confs = copy_and_update(reader_confs, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)),
gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)),
('k1', LongGen(nullable=False, min_val=1, max_val=1)),
('k2', LongGen(nullable=False, min_val=2, max_val=2)),
('k3', LongGen(nullable=False, min_val=3, max_val=3)),
('v0', LongGen()),
('v1', LongGen()),
('v2', LongGen()),
('v3', LongGen())]

gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PAR_DATA'
reader = read_func(data_path)
Expand Down
7 changes: 6 additions & 1 deletion jenkins/Jenkinsfile-blossom.premerge-databricks
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ void databricksBuild() {
def BUILD_PARAMS = " -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -z ./spark-rapids-ci.tgz" +
" -p $DATABRICKS_PRIVKEY -l ./jenkins/databricks/build.sh -d /home/ubuntu/build.sh" +
" -v $BASE_SPARK_VERSION -i $BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS"
sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS"

// add retry for build step to try
// mitigate the issue of downloading dependencies while maven/sonatype is quite unstable
retry(3) {
sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3569,17 +3569,20 @@ object GpuOverrides extends Logging {
expr[JsonToStructs](
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type"),
TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128) +
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type"),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
a.schema match {
case MapType(_: StringType, _: StringType, _) => ()
case _: StructType => ()
case _ =>
willNotWorkOnGpu("from_json on GPU only supports MapType<StringType, StringType> " +
"input schema")
"or StructType schema")
}
GpuJsonScan.tagJsonToStructsSupport(a.options, this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,21 @@ object GpuParquetScan {

FileFormatChecks.tag(meta, readSchema, ParquetFormatType, ReadFileOp)

val schemaHasTimestamps = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType])
}
def isTsOrDate(dt: DataType) : Boolean = dt match {
case TimestampType | DateType => true
case _ => false
}
val schemaMightNeedNestedRebase = readSchema.exists { field =>
if (DataTypeUtils.isNestedType(field.dataType)) {
TrampolineUtil.dataTypeExistsRecursively(field.dataType, isTsOrDate)
} else {
false
}
}

// Currently timestamp conversion is not supported.
// If support needs to be added then we need to follow the logic in Spark's
// ParquetPartitionReaderFactory and VectorizedColumnReader which essentially
Expand All @@ -202,30 +217,35 @@ object GpuParquetScan {
// were written in that timezone and convert them to UTC timestamps.
// Essentially this should boil down to a vector subtract of the scalar delta
// between the configured timezone's delta from UTC on the timestamp data.
val schemaHasTimestamps = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType])
}
if (schemaHasTimestamps && sparkSession.sessionState.conf.isParquetINT96TimestampConversion) {
meta.willNotWorkOnGpu("GpuParquetScan does not support int96 timestamp conversion")
}

sqlConf.get(SparkShimImpl.int96ParquetRebaseReadKey) match {
case DateTimeRebaseException.value | DateTimeRebaseCorrected.value => // Good
case DateTimeRebaseLegacy.value =>
if (schemaHasTimestamps) {
meta.willNotWorkOnGpu("LEGACY rebase mode for dates and timestamps is not supported")
case DateTimeRebaseException.value => if (schemaMightNeedNestedRebase) {
meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " +
s"${SparkShimImpl.int96ParquetRebaseReadKey} is EXCEPTION")
}
case DateTimeRebaseCorrected.value => // Good
case DateTimeRebaseLegacy.value => // really is EXCEPTION for us...
if (schemaMightNeedNestedRebase) {
meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " +
s"${SparkShimImpl.int96ParquetRebaseReadKey} is LEGACY")
}
case other => meta.willNotWorkOnGpu(DateTimeRebaseUtils.invalidRebaseModeMessage(other))
}

sqlConf.get(SparkShimImpl.parquetRebaseReadKey) match {
case DateTimeRebaseException.value | DateTimeRebaseCorrected.value => // Good
case DateTimeRebaseLegacy.value =>
val schemaHasDates = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[DateType])
}
if (schemaHasDates || schemaHasTimestamps) {
meta.willNotWorkOnGpu("LEGACY rebase mode for dates and timestamps is not supported")
case DateTimeRebaseException.value => if (schemaMightNeedNestedRebase) {
meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " +
s"${SparkShimImpl.parquetRebaseReadKey} is EXCEPTION")
}
case DateTimeRebaseCorrected.value
=> // Good
case DateTimeRebaseLegacy.value => // really is EXCEPTION for us...
if (schemaMightNeedNestedRebase) {
meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " +
s"${SparkShimImpl.parquetRebaseReadKey} is LEGACY")
}
case other => meta.willNotWorkOnGpu(DateTimeRebaseUtils.invalidRebaseModeMessage(other))
}
Expand Down
Loading

0 comments on commit 54e959f

Please sign in to comment.