Skip to content

Commit

Permalink
Merge branch 'refactor_parquet_scan' into rebase_datatime
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
  • Loading branch information
ttnghia committed Nov 6, 2023
2 parents 791573c + 3f01690 commit 6d9c20b
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 83 deletions.
32 changes: 16 additions & 16 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,25 +315,25 @@ In the current version, nested types (array, struct, and map types) are not yet

This particular function supports to output a map or struct type with limited functionalities.

For struct output type, the function only supports struct of struct, array, string and int types. The output is incompatible if duplicated json key names are present in the input strings. For schemas that include IntegerType,
if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will cast the numbers to
IntegerType, whereas CPU Spark will return null.
The `from_json` function is disabled by default because it is experimental and has some known incompatibilities
with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStructs=true`.

There are several known issues:

Dates and timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)).

When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting
for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)).

For struct output type, the function only supports struct of struct, array, string, integral, floating-point, and
decimal types. The output is incompatible if duplicated json key names are present in the input strings. For schemas
that include IntegerType, if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will
cast the numbers to IntegerType, whereas CPU Spark will return null.

In particular, the output map is not resulted from a regular JSON parsing but instead it will just contain plain text of key-value pairs extracted directly from the input JSON string. Due to such limitations, the input JSON map type schema must be `MAP<STRING,STRING>` and nothing else. Furthermore, there is no validation, no error tolerance, no data conversion as well as string formatting is performed. This may lead to some minor differences in the output if compared to the result of Spark CPU's `from_json`, such as:
* Floating point numbers in the input JSON string such as `1.2000` will not be reformatted to `1.2`. Instead, the output will be the same as the input.
* If the input JSON is given as multiple rows, any row containing invalid JSON format will lead to an application crash. On the other hand, Spark CPU version just produces nulls for the invalid rows, as shown below:
```
scala> val df = Seq("{}", "BAD", "{\"A\": 100}").toDF
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.selectExpr("from_json(value, 'MAP<STRING,STRING>')").show()
+----------+
| entries|
+----------+
| {}|
| null|
|{A -> 100}|
+----------+
```
* If the input JSON is given as multiple rows, any row containing invalid JSON format will be parsed as an empty
struct instead of a null value ([#9592](https://github.com/NVIDIA/spark-rapids/issues/9592)).

### `to_json` function

Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -8141,8 +8141,8 @@ are limited.
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>MAP only supports keys and values that are of STRING type;<br/>unsupported child types BOOLEAN, DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td><em>PS<br/>unsupported child types BOOLEAN, DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
</tr>
<tr>
Expand Down
59 changes: 46 additions & 13 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,46 +472,79 @@ def test_from_json_map_fallback():
'JsonToStructs',
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.parametrize('schema', ['struct<a:string>',
'struct<d:string>',
'struct<a:string,b:string>',
'struct<c:int,a:string>',
'struct<a:string,a:string>',
])
@pytest.mark.parametrize('schema', [
'struct<a:string>',
'struct<b:string>',
'struct<c:string>',
'struct<a:int>',
'struct<a:long>',
'struct<a:float>',
'struct<a:double>',
'struct<a:decimal>',
'struct<d:string>',
'struct<a:string,b:string>',
'struct<c:int,a:string>',
'struct<a:string,a:string>',
])
def test_from_json_struct(schema):
json_string_gen = StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1\d\d\d}').with_special_pattern('', weight=50)
# note that column 'a' does not use leading zeroes due to https://github.com/NVIDIA/spark-rapids/issues/9588
json_string_gen = StringGen(r'{"a": [1-9]{0,5}, "b": "[A-Z]{0,5}", "c": 1\d\d\d}') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9597')
def test_from_json_struct_boolean():
json_string_gen = StringGen(r'{ "a": [truefalsTRUEFALS]{0,5}, "b": [0-9]{0,2} }') \
.with_special_pattern('{ "a": true, "b": 1 }', weight=50) \
.with_special_pattern('{ "a": false, "b": 0 }', weight=50) \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', 'struct<a:boolean, b:boolean>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

def test_from_json_struct_decimal():
json_string_gen = StringGen(r'{ "a": "[+-]?([0-9]{0,5})?(\.[0-9]{0,2})?([eE][+-]?[0-9]{1,2})?" }') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', 'struct<a:decimal>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:struct<name:string,age:int>>',
'struct<teacher:string,student:struct<name:string,age:int>>'])
def test_from_json_struct_of_struct(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}').with_special_pattern('', weight=50)
r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50) \
.with_special_pattern('invalid_entry', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:array<struct<name:string,class:string>>>',
'struct<teacher:string,student:array<struct<name:string,class:string>>>'])
def test_from_json_struct_of_list(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": \[{"name": "[A-Z]{1}[a-z]{2,5}", "class": "junior"},' \
r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}').with_special_pattern('', weight=50)
r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558')
@pytest.mark.parametrize('schema', ['struct<a:string>', 'struct<a:string,b:int>'])
def test_from_json_struct_all_empty_string_input(schema):
json_string_gen = StringGen('')
Expand Down
7 changes: 6 additions & 1 deletion jenkins/Jenkinsfile-blossom.premerge-databricks
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ void databricksBuild() {
def BUILD_PARAMS = " -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -z ./spark-rapids-ci.tgz" +
" -p $DATABRICKS_PRIVKEY -l ./jenkins/databricks/build.sh -d /home/ubuntu/build.sh" +
" -v $BASE_SPARK_VERSION -i $BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS"
sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS"

// add retry for build step to try
// mitigate the issue of downloading dependencies while maven/sonatype is quite unstable
retry(3) {
sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3569,17 +3569,20 @@ object GpuOverrides extends Logging {
expr[JsonToStructs](
"Returns a struct value with the given `jsonStr` and `schema`",
ExprChecks.projectOnly(
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type"),
TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral +
TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128) +
TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP,
"MAP only supports keys and values that are of STRING type"),
(TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all),
Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))),
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
a.schema match {
case MapType(_: StringType, _: StringType, _) => ()
case _: StructType => ()
case _ =>
willNotWorkOnGpu("from_json on GPU only supports MapType<StringType, StringType> " +
"input schema")
"or StructType schema")
}
GpuJsonScan.tagJsonToStructsSupport(a.options, this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{ColumnView, DType, Scalar}
import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.SparkShimImpl

Expand Down Expand Up @@ -103,49 +103,54 @@ object DateTimeRebaseUtils {
SPARK_LEGACY_INT96_METADATA_KEY)
}

private def isRebaseNeeded(column: ColumnView, checkType: DType, minGood: Scalar): Boolean = {
private[this] def isDateRebaseNeeded(column: ColumnVector,
startDay: Int): Boolean = {
// TODO update this for nested column checks
// https://github.com/NVIDIA/spark-rapids/issues/1126
val dtype = column.getType
require(!dtype.hasTimeResolution || dtype == DType.TIMESTAMP_MICROSECONDS)
if (dtype == DType.TIMESTAMP_DAYS) {
val hasBad = withResource(Scalar.timestampDaysFromInt(startDay)) {
column.lessThan
}
val anyBad = withResource(hasBad) {
_.any()
}
withResource(anyBad) { _ =>
anyBad.isValid && anyBad.getBoolean
}
} else {
false
}
}

dtype match {
case `checkType` =>
private[this] def isTimeRebaseNeeded(column: ColumnVector,
startTs: Long): Boolean = {
val dtype = column.getType
if (dtype.hasTimeResolution) {
require(dtype == DType.TIMESTAMP_MICROSECONDS)
withResource(
Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, startTs)) { minGood =>
withResource(column.lessThan(minGood)) { hasBad =>
withResource(hasBad.any()) { anyBad =>
anyBad.isValid && anyBad.getBoolean
withResource(hasBad.any()) { a =>
a.isValid && a.getBoolean
}
}

case DType.LIST | DType.STRUCT => (0 until column.getNumChildren).exists(i =>
withResource(column.getChildColumnView(i)) { child =>
isRebaseNeeded(child, checkType, minGood)
})

case _ => false
}
}

private def isDateRebaseNeeded(column: ColumnView, startDay: Int): Boolean = {
withResource(Scalar.timestampDaysFromInt(startDay)) { minGood =>
isRebaseNeeded(column, DType.TIMESTAMP_DAYS, minGood)
}
}

private def isTimeRebaseNeeded(column: ColumnView, startTs: Long): Boolean = {
withResource(Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, startTs)) { minGood =>
isRebaseNeeded(column, DType.TIMESTAMP_MICROSECONDS, minGood)
}
} else {
false
}
}

def isDateRebaseNeededInRead(column: ColumnView): Boolean =
def isDateRebaseNeededInRead(column: ColumnVector): Boolean =
isDateRebaseNeeded(column, RebaseDateTime.lastSwitchJulianDay)

def isTimeRebaseNeededInRead(column: ColumnView): Boolean =
def isTimeRebaseNeededInRead(column: ColumnVector): Boolean =
isTimeRebaseNeeded(column, RebaseDateTime.lastSwitchJulianTs)

def isDateRebaseNeededInWrite(column: ColumnView): Boolean =
def isDateRebaseNeededInWrite(column: ColumnVector): Boolean =
isDateRebaseNeeded(column, RebaseDateTime.lastSwitchGregorianDay)

def isTimeRebaseNeededInWrite(column: ColumnView): Boolean =
def isTimeRebaseNeededInWrite(column: ColumnVector): Boolean =
isTimeRebaseNeeded(column, RebaseDateTime.lastSwitchGregorianTs)

def newRebaseExceptionInRead(format: String): Exception = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf
import ai.rapids.cudf.{ColumnVector, ColumnView, Scalar}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuCast.doCast
Expand Down Expand Up @@ -68,27 +69,46 @@ case class GpuJsonToStructs(
}
}
closeOnExcept(isNullOrEmptyInput) { _ =>
withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { cleaned =>
withResource(cudf.Scalar.fromString("\n")) { lineSep =>
withResource(cudf.Scalar.fromString("\r")) { returnSep =>
withResource(cleaned.stringContains(lineSep)) { inputHas =>
withResource(inputHas.any()) { anyLineSep =>
if (anyLineSep.isValid && anyLineSep.getBoolean) {
throw new IllegalArgumentException("We cannot currently support parsing " +
"JSON that contains a line separator in it")
withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { nullsReplaced =>
val isLiteralNull = withResource(Scalar.fromString("null")) { literalNull =>
nullsReplaced.equalTo(literalNull)
}
withResource(isLiteralNull) { _ =>
withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned =>
withResource(cudf.Scalar.fromString("\n")) { lineSep =>
withResource(cudf.Scalar.fromString("\r")) { returnSep =>
withResource(cleaned.stringContains(lineSep)) { inputHas =>
withResource(inputHas.any()) { anyLineSep =>
if (anyLineSep.isValid && anyLineSep.getBoolean) {
throw new IllegalArgumentException(
"We cannot currently support parsing " +
"JSON that contains a line separator in it")
}
}
}
withResource(cleaned.stringContains(returnSep)) { inputHas =>
withResource(inputHas.any()) { anyReturnSep =>
if (anyReturnSep.isValid && anyReturnSep.getBoolean) {
throw new IllegalArgumentException(
"We cannot currently support parsing " +
"JSON that contains a carriage return in it")
}
}
}
}
}
withResource(cleaned.stringContains(returnSep)) { inputHas =>
withResource(inputHas.any()) { anyReturnSep =>
if (anyReturnSep.isValid && anyReturnSep.getBoolean) {
throw new IllegalArgumentException("We cannot currently support parsing " +
"JSON that contains a carriage return in it")

// if the last entry in a column is incomplete or invalid, then cuDF
// will drop the row rather than replace with null if there is no newline, so we
// add a newline here to prevent that
val joined = withResource(cleaned.joinStrings(lineSep, emptyRow)) { joined =>
withResource(ColumnVector.fromStrings("\n")) { newline =>
ColumnVector.stringConcatenate(Array[ColumnView](joined, newline))
}
}

(isNullOrEmptyInput, joined)
}
}
(isNullOrEmptyInput, cleaned.joinStrings(lineSep, emptyRow))
}
}
}
Expand Down Expand Up @@ -160,8 +180,8 @@ case class GpuJsonToStructs(
val end = combinedHost.getEndListOffset(0)
val length = end - start

withResource(cudf.Table.readJSON(cudf.JSONOptions.DEFAULT, data, start,
length)) { tableWithMeta =>
val jsonOptions = cudf.JSONOptions.builder().withRecoverWithNull(true).build()
withResource(cudf.Table.readJSON(jsonOptions, data, start, length)) { tableWithMeta =>
val names = tableWithMeta.getColumnNames
(names, tableWithMeta.releaseTable())
}
Expand Down
2 changes: 1 addition & 1 deletion tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ IsNotNull,S,`isnotnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,N
IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,NS,NA
JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,PS,NA
JsonTuple,S,`json_tuple`,None,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
JsonTuple,S,`json_tuple`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
Expand Down

0 comments on commit 6d9c20b

Please sign in to comment.