diff --git a/docs/compatibility.md b/docs/compatibility.md index b05b8d072f2..493d37efd00 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -315,25 +315,25 @@ In the current version, nested types (array, struct, and map types) are not yet This particular function supports to output a map or struct type with limited functionalities. -For struct output type, the function only supports struct of struct, array, string and int types. The output is incompatible if duplicated json key names are present in the input strings. For schemas that include IntegerType, -if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will cast the numbers to -IntegerType, whereas CPU Spark will return null. +The `from_json` function is disabled by default because it is experimental and has some known incompatibilities +with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStructs=true`. + +There are several known issues: + +Dates and timestamps are not supported ([#9590](https://github.com/NVIDIA/spark-rapids/issues/9590)). + +When reading numeric values, the GPU implementation always supports leading zeros regardless of the setting +for the JSON option `allowNumericLeadingZeros` ([#9588](https://github.com/NVIDIA/spark-rapids/issues/9588)). + +For struct output type, the function only supports struct of struct, array, string, integral, floating-point, and +decimal types. The output is incompatible if duplicated json key names are present in the input strings. For schemas +that include IntegerType, if arbitrarily large numbers are specified in the JSON strings, the GPU implementation will +cast the numbers to IntegerType, whereas CPU Spark will return null. In particular, the output map is not resulted from a regular JSON parsing but instead it will just contain plain text of key-value pairs extracted directly from the input JSON string. Due to such limitations, the input JSON map type schema must be `MAP` and nothing else. Furthermore, there is no validation, no error tolerance, no data conversion as well as string formatting is performed. This may lead to some minor differences in the output if compared to the result of Spark CPU's `from_json`, such as: * Floating point numbers in the input JSON string such as `1.2000` will not be reformatted to `1.2`. Instead, the output will be the same as the input. - * If the input JSON is given as multiple rows, any row containing invalid JSON format will lead to an application crash. On the other hand, Spark CPU version just produces nulls for the invalid rows, as shown below: - ``` -scala> val df = Seq("{}", "BAD", "{\"A\": 100}").toDF -df: org.apache.spark.sql.DataFrame = [value: string] -scala> df.selectExpr("from_json(value, 'MAP')").show() -+----------+ -| entries| -+----------+ -| {}| -| null| -|{A -> 100}| -+----------+ -``` + * If the input JSON is given as multiple rows, any row containing invalid JSON format will be parsed as an empty + struct instead of a null value ([#9592](https://github.com/NVIDIA/spark-rapids/issues/9592)). ### `to_json` function diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 4464500b737..8a8a5c52066 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -8141,8 +8141,8 @@ are limited. NS -PS
MAP only supports keys and values that are of STRING type;
unsupported child types BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP, DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT
-NS +PS
MAP only supports keys and values that are of STRING type;
unsupported child types BOOLEAN, DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
+PS
unsupported child types BOOLEAN, DATE, TIMESTAMP, NULL, BINARY, CALENDAR, MAP, UDT
diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 9efd6bcca4b..7b6ff992419 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -472,46 +472,79 @@ def test_from_json_map_fallback(): 'JsonToStructs', conf={"spark.rapids.sql.expression.JsonToStructs": True}) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558') -@pytest.mark.parametrize('schema', ['struct', - 'struct', - 'struct', - 'struct', - 'struct', - ]) +@pytest.mark.parametrize('schema', [ + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + 'struct', + ]) def test_from_json_struct(schema): - json_string_gen = StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1\d\d\d}').with_special_pattern('', weight=50) + # note that column 'a' does not use leading zeroes due to https://github.com/NVIDIA/spark-rapids/issues/9588 + json_string_gen = StringGen(r'{"a": [1-9]{0,5}, "b": "[A-Z]{0,5}", "c": 1\d\d\d}') \ + .with_special_pattern('', weight=50) \ + .with_special_pattern('null', weight=50) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, json_string_gen) \ .select(f.from_json('a', schema)), conf={"spark.rapids.sql.expression.JsonToStructs": True}) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558') +@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9597') +def test_from_json_struct_boolean(): + json_string_gen = StringGen(r'{ "a": [truefalsTRUEFALS]{0,5}, "b": [0-9]{0,2} }') \ + .with_special_pattern('{ "a": true, "b": 1 }', weight=50) \ + .with_special_pattern('{ "a": false, "b": 0 }', weight=50) \ + .with_special_pattern('', weight=50) \ + .with_special_pattern('null', weight=50) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.from_json('a', 'struct')), + conf={"spark.rapids.sql.expression.JsonToStructs": True}) + +def test_from_json_struct_decimal(): + json_string_gen = StringGen(r'{ "a": "[+-]?([0-9]{0,5})?(\.[0-9]{0,2})?([eE][+-]?[0-9]{1,2})?" }') \ + .with_special_pattern('', weight=50) \ + .with_special_pattern('null', weight=50) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.from_json('a', 'struct')), + conf={"spark.rapids.sql.expression.JsonToStructs": True}) + @pytest.mark.parametrize('schema', ['struct', 'struct>', 'struct>']) def test_from_json_struct_of_struct(schema): json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \ - r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}').with_special_pattern('', weight=50) + r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}') \ + .with_special_pattern('', weight=50) \ + .with_special_pattern('null', weight=50) \ + .with_special_pattern('invalid_entry', weight=50) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, json_string_gen) \ .select(f.from_json('a', schema)), conf={"spark.rapids.sql.expression.JsonToStructs": True}) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558') @pytest.mark.parametrize('schema', ['struct', 'struct>>', 'struct>>']) def test_from_json_struct_of_list(schema): json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \ r'"student": \[{"name": "[A-Z]{1}[a-z]{2,5}", "class": "junior"},' \ - r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}').with_special_pattern('', weight=50) + r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}') \ + .with_special_pattern('', weight=50) \ + .with_special_pattern('null', weight=50) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, json_string_gen) \ .select(f.from_json('a', schema)), conf={"spark.rapids.sql.expression.JsonToStructs": True}) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8558') @pytest.mark.parametrize('schema', ['struct', 'struct']) def test_from_json_struct_all_empty_string_input(schema): json_string_gen = StringGen('') diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 51b327dfd70..b3e04b91d93 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -312,14 +312,35 @@ def test_parquet_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1 parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'] -# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with timestamp_gen -@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), - ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))), + +# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed delete this test and merge it +# into test_ts_read_round_trip nested timestamps and dates are not supported right now. +@pytest.mark.parametrize('gen', [ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))), ArrayGen(ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))))], ids=idfn) @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY']) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1126') +def test_parquet_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark : unary_op_df(spark, gen).write.parquet(data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, + 'spark.sql.legacy.parquet.int96RebaseModeInWrite': ts_rebase, + 'spark.sql.parquet.outputTimestampType': ts_write}) + all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path), + conf=all_confs) + +# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with +# timestamp_gen +@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))], ids=idfn) +@pytest.mark.parametrize('ts_write', parquet_ts_write_options) +@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY']) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_ts_read_round_trip(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -337,10 +358,10 @@ def readParquetCatchException(spark, data_path): df = spark.read.parquet(data_path).collect() assert e_info.match(r".*SparkUpgradeException.*") -# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with timestamp_gen -@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc)), - ArrayGen(TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))), - ArrayGen(ArrayGen(TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))))], ids=idfn) +# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed nested timestamps and dates should be added in +# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with +# timestamp_gen +@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))], ids=idfn) @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['LEGACY']) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @@ -982,7 +1003,7 @@ def test_parquet_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp conf_for_parquet_aggregate_pushdown = { - "spark.sql.parquet.aggregatePushdown": "true", + "spark.sql.parquet.aggregatePushdown": "true", "spark.sql.sources.useV1SourceList": "" } @@ -1469,7 +1490,7 @@ def test_parquet_read_count(spark_tmp_path): def test_read_case_col_name(spark_tmp_path, read_func, v1_enabled_list, reader_confs, col_name): all_confs = copy_and_update(reader_confs, { 'spark.sql.sources.useV1SourceList': v1_enabled_list}) - gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)), + gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)), ('k1', LongGen(nullable=False, min_val=1, max_val=1)), ('k2', LongGen(nullable=False, min_val=2, max_val=2)), ('k3', LongGen(nullable=False, min_val=3, max_val=3)), @@ -1477,7 +1498,7 @@ def test_read_case_col_name(spark_tmp_path, read_func, v1_enabled_list, reader_c ('v1', LongGen()), ('v2', LongGen()), ('v3', LongGen())] - + gen = StructGen(gen_list, nullable=False) data_path = spark_tmp_path + '/PAR_DATA' reader = read_func(data_path) diff --git a/jenkins/Jenkinsfile-blossom.premerge-databricks b/jenkins/Jenkinsfile-blossom.premerge-databricks index 8c871414840..0ea835d39a9 100644 --- a/jenkins/Jenkinsfile-blossom.premerge-databricks +++ b/jenkins/Jenkinsfile-blossom.premerge-databricks @@ -160,7 +160,12 @@ void databricksBuild() { def BUILD_PARAMS = " -w $DATABRICKS_HOST -t $DATABRICKS_TOKEN -c $CLUSTER_ID -z ./spark-rapids-ci.tgz" + " -p $DATABRICKS_PRIVKEY -l ./jenkins/databricks/build.sh -d /home/ubuntu/build.sh" + " -v $BASE_SPARK_VERSION -i $BASE_SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS" - sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS" + + // add retry for build step to try + // mitigate the issue of downloading dependencies while maven/sonatype is quite unstable + retry(3) { + sh "python3 ./jenkins/databricks/run-build.py $BUILD_PARAMS" + } } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a307e3cfade..a0585310d37 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3569,17 +3569,20 @@ object GpuOverrides extends Logging { expr[JsonToStructs]( "Returns a struct value with the given `jsonStr` and `schema`", ExprChecks.projectOnly( - TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP, - "MAP only supports keys and values that are of STRING type"), + TypeSig.STRUCT.nested(TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.STRING + TypeSig.integral + + TypeSig.fp + TypeSig.DECIMAL_64 + TypeSig.DECIMAL_128) + + TypeSig.MAP.nested(TypeSig.STRING).withPsNote(TypeEnum.MAP, + "MAP only supports keys and values that are of STRING type"), (TypeSig.STRUCT + TypeSig.MAP + TypeSig.ARRAY).nested(TypeSig.all), Seq(ParamCheck("jsonStr", TypeSig.STRING, TypeSig.STRING))), (a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) { override def tagExprForGpu(): Unit = a.schema match { case MapType(_: StringType, _: StringType, _) => () + case _: StructType => () case _ => willNotWorkOnGpu("from_json on GPU only supports MapType " + - "input schema") + "or StructType schema") } GpuJsonScan.tagJsonToStructsSupport(a.options, this) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 6c4758ad6d7..551affd1916 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -193,6 +193,21 @@ object GpuParquetScan { FileFormatChecks.tag(meta, readSchema, ParquetFormatType, ReadFileOp) + val schemaHasTimestamps = readSchema.exists { field => + TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType]) + } + def isTsOrDate(dt: DataType) : Boolean = dt match { + case TimestampType | DateType => true + case _ => false + } + val schemaMightNeedNestedRebase = readSchema.exists { field => + if (DataTypeUtils.isNestedType(field.dataType)) { + TrampolineUtil.dataTypeExistsRecursively(field.dataType, isTsOrDate) + } else { + false + } + } + // Currently timestamp conversion is not supported. // If support needs to be added then we need to follow the logic in Spark's // ParquetPartitionReaderFactory and VectorizedColumnReader which essentially @@ -202,30 +217,35 @@ object GpuParquetScan { // were written in that timezone and convert them to UTC timestamps. // Essentially this should boil down to a vector subtract of the scalar delta // between the configured timezone's delta from UTC on the timestamp data. - val schemaHasTimestamps = readSchema.exists { field => - TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType]) - } if (schemaHasTimestamps && sparkSession.sessionState.conf.isParquetINT96TimestampConversion) { meta.willNotWorkOnGpu("GpuParquetScan does not support int96 timestamp conversion") } sqlConf.get(SparkShimImpl.int96ParquetRebaseReadKey) match { - case DateTimeRebaseException.value | DateTimeRebaseCorrected.value => // Good - case DateTimeRebaseLegacy.value => - if (schemaHasTimestamps) { - meta.willNotWorkOnGpu("LEGACY rebase mode for dates and timestamps is not supported") + case DateTimeRebaseException.value => if (schemaMightNeedNestedRebase) { + meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " + + s"${SparkShimImpl.int96ParquetRebaseReadKey} is EXCEPTION") + } + case DateTimeRebaseCorrected.value => // Good + case DateTimeRebaseLegacy.value => // really is EXCEPTION for us... + if (schemaMightNeedNestedRebase) { + meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " + + s"${SparkShimImpl.int96ParquetRebaseReadKey} is LEGACY") } case other => meta.willNotWorkOnGpu(DateTimeRebaseUtils.invalidRebaseModeMessage(other)) } sqlConf.get(SparkShimImpl.parquetRebaseReadKey) match { - case DateTimeRebaseException.value | DateTimeRebaseCorrected.value => // Good - case DateTimeRebaseLegacy.value => - val schemaHasDates = readSchema.exists { field => - TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[DateType]) - } - if (schemaHasDates || schemaHasTimestamps) { - meta.willNotWorkOnGpu("LEGACY rebase mode for dates and timestamps is not supported") + case DateTimeRebaseException.value => if (schemaMightNeedNestedRebase) { + meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " + + s"${SparkShimImpl.parquetRebaseReadKey} is EXCEPTION") + } + case DateTimeRebaseCorrected.value + => // Good + case DateTimeRebaseLegacy.value => // really is EXCEPTION for us... + if (schemaMightNeedNestedRebase) { + meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " + + s"${SparkShimImpl.parquetRebaseReadKey} is LEGACY") } case other => meta.willNotWorkOnGpu(DateTimeRebaseUtils.invalidRebaseModeMessage(other)) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala index 300c5e32884..c78740dceea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/datetimeRebaseUtils.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{ColumnView, DType, Scalar} +import ai.rapids.cudf.{ColumnVector, DType, Scalar} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.shims.SparkShimImpl @@ -103,49 +103,54 @@ object DateTimeRebaseUtils { SPARK_LEGACY_INT96_METADATA_KEY) } - private def isRebaseNeeded(column: ColumnView, checkType: DType, minGood: Scalar): Boolean = { + private[this] def isDateRebaseNeeded(column: ColumnVector, + startDay: Int): Boolean = { + // TODO update this for nested column checks + // https://github.com/NVIDIA/spark-rapids/issues/1126 val dtype = column.getType - require(!dtype.hasTimeResolution || dtype == DType.TIMESTAMP_MICROSECONDS) + if (dtype == DType.TIMESTAMP_DAYS) { + val hasBad = withResource(Scalar.timestampDaysFromInt(startDay)) { + column.lessThan + } + val anyBad = withResource(hasBad) { + _.any() + } + withResource(anyBad) { _ => + anyBad.isValid && anyBad.getBoolean + } + } else { + false + } + } - dtype match { - case `checkType` => + private[this] def isTimeRebaseNeeded(column: ColumnVector, + startTs: Long): Boolean = { + val dtype = column.getType + if (dtype.hasTimeResolution) { + require(dtype == DType.TIMESTAMP_MICROSECONDS) + withResource( + Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, startTs)) { minGood => withResource(column.lessThan(minGood)) { hasBad => - withResource(hasBad.any()) { anyBad => - anyBad.isValid && anyBad.getBoolean + withResource(hasBad.any()) { a => + a.isValid && a.getBoolean } } - - case DType.LIST | DType.STRUCT => (0 until column.getNumChildren).exists(i => - withResource(column.getChildColumnView(i)) { child => - isRebaseNeeded(child, checkType, minGood) - }) - - case _ => false - } - } - - private def isDateRebaseNeeded(column: ColumnView, startDay: Int): Boolean = { - withResource(Scalar.timestampDaysFromInt(startDay)) { minGood => - isRebaseNeeded(column, DType.TIMESTAMP_DAYS, minGood) - } - } - - private def isTimeRebaseNeeded(column: ColumnView, startTs: Long): Boolean = { - withResource(Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, startTs)) { minGood => - isRebaseNeeded(column, DType.TIMESTAMP_MICROSECONDS, minGood) + } + } else { + false } } - def isDateRebaseNeededInRead(column: ColumnView): Boolean = + def isDateRebaseNeededInRead(column: ColumnVector): Boolean = isDateRebaseNeeded(column, RebaseDateTime.lastSwitchJulianDay) - def isTimeRebaseNeededInRead(column: ColumnView): Boolean = + def isTimeRebaseNeededInRead(column: ColumnVector): Boolean = isTimeRebaseNeeded(column, RebaseDateTime.lastSwitchJulianTs) - def isDateRebaseNeededInWrite(column: ColumnView): Boolean = + def isDateRebaseNeededInWrite(column: ColumnVector): Boolean = isDateRebaseNeeded(column, RebaseDateTime.lastSwitchGregorianDay) - def isTimeRebaseNeededInWrite(column: ColumnView): Boolean = + def isTimeRebaseNeededInWrite(column: ColumnVector): Boolean = isTimeRebaseNeeded(column, RebaseDateTime.lastSwitchGregorianTs) def newRebaseExceptionInRead(format: String): Exception = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index 96b4c31f3c9..4d4fb1039a1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf +import ai.rapids.cudf.{ColumnVector, ColumnView, Scalar} import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuCast.doCast @@ -68,27 +69,46 @@ case class GpuJsonToStructs( } } closeOnExcept(isNullOrEmptyInput) { _ => - withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { cleaned => - withResource(cudf.Scalar.fromString("\n")) { lineSep => - withResource(cudf.Scalar.fromString("\r")) { returnSep => - withResource(cleaned.stringContains(lineSep)) { inputHas => - withResource(inputHas.any()) { anyLineSep => - if (anyLineSep.isValid && anyLineSep.getBoolean) { - throw new IllegalArgumentException("We cannot currently support parsing " + - "JSON that contains a line separator in it") + withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { nullsReplaced => + val isLiteralNull = withResource(Scalar.fromString("null")) { literalNull => + nullsReplaced.equalTo(literalNull) + } + withResource(isLiteralNull) { _ => + withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned => + withResource(cudf.Scalar.fromString("\n")) { lineSep => + withResource(cudf.Scalar.fromString("\r")) { returnSep => + withResource(cleaned.stringContains(lineSep)) { inputHas => + withResource(inputHas.any()) { anyLineSep => + if (anyLineSep.isValid && anyLineSep.getBoolean) { + throw new IllegalArgumentException( + "We cannot currently support parsing " + + "JSON that contains a line separator in it") + } + } + } + withResource(cleaned.stringContains(returnSep)) { inputHas => + withResource(inputHas.any()) { anyReturnSep => + if (anyReturnSep.isValid && anyReturnSep.getBoolean) { + throw new IllegalArgumentException( + "We cannot currently support parsing " + + "JSON that contains a carriage return in it") + } + } } } - } - withResource(cleaned.stringContains(returnSep)) { inputHas => - withResource(inputHas.any()) { anyReturnSep => - if (anyReturnSep.isValid && anyReturnSep.getBoolean) { - throw new IllegalArgumentException("We cannot currently support parsing " + - "JSON that contains a carriage return in it") + + // if the last entry in a column is incomplete or invalid, then cuDF + // will drop the row rather than replace with null if there is no newline, so we + // add a newline here to prevent that + val joined = withResource(cleaned.joinStrings(lineSep, emptyRow)) { joined => + withResource(ColumnVector.fromStrings("\n")) { newline => + ColumnVector.stringConcatenate(Array[ColumnView](joined, newline)) } } + + (isNullOrEmptyInput, joined) } } - (isNullOrEmptyInput, cleaned.joinStrings(lineSep, emptyRow)) } } } @@ -160,8 +180,8 @@ case class GpuJsonToStructs( val end = combinedHost.getEndListOffset(0) val length = end - start - withResource(cudf.Table.readJSON(cudf.JSONOptions.DEFAULT, data, start, - length)) { tableWithMeta => + val jsonOptions = cudf.JSONOptions.builder().withRecoverWithNull(true).build() + withResource(cudf.Table.readJSON(jsonOptions, data, start, length)) { tableWithMeta => val names = tableWithMeta.getColumnNames (names, tableWithMeta.releaseTable()) } diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index 544633301d4..f57e384a25d 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -276,7 +276,7 @@ IsNotNull,S,`isnotnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,N IsNull,S,`isnull`,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS IsNull,S,`isnull`,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,jsonStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA -JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,NS,NA +JsonToStructs,NS,`from_json`,This is disabled by default because parsing JSON from a column has a large number of issues and should be considered beta quality right now.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NS,PS,PS,NA JsonTuple,S,`json_tuple`,None,project,json,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA JsonTuple,S,`json_tuple`,None,project,field,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA JsonTuple,S,`json_tuple`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA