diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index fe1d9064933..4ed627a72fc 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -1454,3 +1454,11 @@ def test_spark_from_json_invalid_json(): assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.createDataFrame(data, 'json STRING').select(f.col('json'), f.from_json(f.col('json'), schema)), conf =_enable_all_types_conf) + +@allow_non_gpu(*non_utc_allow) +def test_from_json_input_wrapped_in_whitespaces(): + json_string_gen = StringGen(r'[ \r\n\t]{0,5}({"key":( |\r|\n|\t|)"[A-Za-z]{0,5}"}|null|invalid|)[ \r\n\t]{0,5}') + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, json_string_gen) \ + .select(f.from_json('a', 'struct')), + conf=_enable_all_types_conf) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala index c593eebe26e..b614a9b170f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonReadCommon.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids import java.util.Locale -import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar, Schema, Table} +import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, NvtxColor, NvtxRange, Scalar, Schema, Table} import com.fasterxml.jackson.core.JsonParser import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuScalar, GpuTextBasedPartitionReader} import com.nvidia.spark.rapids.Arm.withResource @@ -311,21 +311,26 @@ object GpuJsonReadCommon { def convertTableToDesiredType(table: Table, desired: StructType, options: JSONOptions): Array[ColumnVector] = { - val dataTypes = desired.fields.map(_.dataType) - dataTypes.zipWithIndex.safeMap { - case (dt, i) => - convertToDesiredType(table.getColumn(i), dt, options) + withResource(new NvtxRange("convertTableToDesiredType", NvtxColor.RED)) { _ => + val dataTypes = desired.fields.map(_.dataType) + dataTypes.zipWithIndex.safeMap { + case (dt, i) => + convertToDesiredType(table.getColumn(i), dt, options) + } } } - def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = { + def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = + cudfJsonOptionBuilder(options).build() + + def cudfJsonOptionBuilder(options: JSONOptions): ai.rapids.cudf.JSONOptions.Builder = { // This is really ugly, but options.allowUnquotedControlChars is marked as private // and this is the only way I know to get it without even uglier tricks @scala.annotation.nowarn("msg=Java enum ALLOW_UNQUOTED_CONTROL_CHARS in " + "Java enum Feature is deprecated") - val allowUnquotedControlChars = - options.buildJsonFactory() - .isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS) + val allowUnquotedControlChars = options.buildJsonFactory() + .isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS) + ai.rapids.cudf.JSONOptions.builder() .withRecoverWithNull(true) .withMixedTypesAsStrings(true) @@ -338,6 +343,5 @@ object GpuJsonReadCommon { .withUnquotedControlChars(allowUnquotedControlChars) .withCudfPruneSchema(true) .withExperimental(true) - .build() } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala index e60aefb8d59..a62aba24760 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuJsonToStructs.scala @@ -17,11 +17,10 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf -import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, Scalar} -import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression, HostAlloc} +import ai.rapids.cudf.{ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange} +import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression, HostAlloc} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.jni.JSONUtils -import org.apache.commons.text.StringEscapeUtils import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression} import org.apache.spark.sql.catalyst.json.JSONOptions @@ -34,8 +33,7 @@ import org.apache.spark.sql.types._ */ class JsonParsingException(s: String, cause: Throwable) extends RuntimeException(s, cause) {} -class JsonDeviceDataSource(combined: ColumnVector) extends DataSource { - lazy val data: BaseDeviceMemoryBuffer = combined.getData +class JsonDeviceDataSource(data: DeviceMemoryBuffer) extends DataSource { lazy val totalSize: Long = data.getLength override def size(): Long = totalSize @@ -64,11 +62,6 @@ class JsonDeviceDataSource(combined: ColumnVector) extends DataSource { dest.copyFromDeviceBufferAsync(0, data, offset, length, stream) length } - - override def close(): Unit = { - combined.close() - super.close() - } } case class GpuJsonToStructs( @@ -80,142 +73,67 @@ case class GpuJsonToStructs( with NullIntolerant { import GpuJsonReadCommon._ - private lazy val emptyRowStr = constructEmptyRow(schema) - - private def constructEmptyRow(schema: DataType): String = { - schema match { - case struct: StructType if struct.fields.nonEmpty => - s"""{"${StringEscapeUtils.escapeJson(struct.head.name)}":null}""" - case other => - throw new IllegalArgumentException(s"$other is not supported as a top level type") } - } - - private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) = { - val stripped = if (input.getData == null) { - input.incRefCount - } else { - withResource(cudf.Scalar.fromString(" ")) { space => - input.strip(space) - } - } - - withResource(stripped) { stripped => - val isEmpty = withResource(stripped.getByteCount) { lengths => - withResource(cudf.Scalar.fromInt(0)) { zero => - lengths.lessOrEqualTo(zero) - } - } - val isNullOrEmptyInput = withResource(isEmpty) { _ => - withResource(input.isNull) { isNull => - isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8) - } - } - closeOnExcept(isNullOrEmptyInput) { _ => - withResource(cudf.Scalar.fromString(emptyRowStr)) { emptyRow => - // TODO is it worth checking if any are empty or null and then skipping this? - withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { nullsReplaced => - val isLiteralNull = withResource(Scalar.fromString("null")) { literalNull => - nullsReplaced.equalTo(literalNull) - } - withResource(isLiteralNull) { _ => - withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned => - checkForNewline(cleaned, "\n", "line separator") - checkForNewline(cleaned, "\r", "carriage return") - - // add a newline to each JSON line - val withNewline = withResource(cudf.Scalar.fromString("\n")) { lineSep => - withResource(ColumnVector.fromScalar(lineSep, cleaned.getRowCount.toInt)) { - newLineCol => - ColumnVector.stringConcatenate(Array[ColumnView](cleaned, newLineCol)) - } - } - - // We technically don't need to join the strings together as we just want the buffer - // which should be the same either way. - (isNullOrEmptyInput, withNewline) - } - } - } - } - } - } - } - - private def checkForNewline(cleaned: ColumnVector, newlineStr: String, name: String): Unit = { - withResource(cudf.Scalar.fromString(newlineStr)) { newline => - withResource(cleaned.stringContains(newline)) { hasNewline => - withResource(hasNewline.any()) { anyNewline => - if (anyNewline.isValid && anyNewline.getBoolean) { - throw new IllegalArgumentException( - s"We cannot currently support parsing JSON that contains a $name in it") - } - } - } - } - } - private lazy val parsedOptions = new JSONOptions( options, timeZoneId.get, SQLConf.get.columnNameOfCorruptRecord) - private lazy val jsonOptions = - GpuJsonReadCommon.cudfJsonOptions(parsedOptions) + private lazy val jsonOptionBuilder = + GpuJsonReadCommon.cudfJsonOptionBuilder(parsedOptions) override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = { - schema match { - case _: MapType => - JSONUtils.extractRawMapFromJsonString(input.getBase) - case struct: StructType => { - // if we ever need to support duplicate keys we need to keep track of the duplicates - // and make the first one null, but I don't think this will ever happen in practice - val cudfSchema = makeSchema(struct) - - // We cannot handle all corner cases with this right now. The parser just isn't - // good enough, but we will try to handle a few common ones. - val numRows = input.getRowCount.toInt - - // Step 1: verify and preprocess the data to clean it up and normalize a few things - // Step 2: Concat the data into a single buffer - val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase) - withResource(isNullOrEmpty) { isNullOrEmpty => - // Step 3: setup a datasource - val table = withResource(new JsonDeviceDataSource(combined)) { ds => - // Step 4: Have cudf parse the JSON data - try { - cudf.Table.readJSON(cudfSchema, jsonOptions, ds, numRows) - } catch { - case e : RuntimeException => - throw new JsonParsingException("Currently some Json to Struct cases " + - "are not supported. Consider to set spark.rapids.sql.expression.JsonToStructs" + - "=false", e) + withResource(new NvtxRange("GpuJsonToStructs", NvtxColor.YELLOW)) { _ => + schema match { + case _: MapType => JSONUtils.extractRawMapFromJsonString(input.getBase) + case struct: StructType => + // if we ever need to support duplicate keys we need to keep track of the duplicates + // and make the first one null, but I don't think this will ever happen in practice + val cudfSchema = makeSchema(struct) + + // We cannot handle all corner cases with this right now. The parser just isn't + // good enough, but we will try to handle a few common ones. + val numRows = input.getRowCount.toInt + + // Step 1: Concat the data into a single buffer, with verifying nulls/empty strings + val concatenated = JSONUtils.concatenateJsonStrings(input.getBase) + withResource(concatenated) { _ => + // Step 2: Setup a datasource from the concatenated JSON strings + val table = withResource(new JsonDeviceDataSource(concatenated.data)) { ds => + withResource(new NvtxRange("Table.readJSON", NvtxColor.RED)) { _ => + // Step 3: Have cudf parse the JSON data + try { + cudf.Table.readJSON(cudfSchema, + jsonOptionBuilder.withLineDelimiter(concatenated.delimiter).build(), + ds, + numRows) + } catch { + case e: RuntimeException => + throw new JsonParsingException("Currently some JsonToStructs cases " + + "are not supported. " + + "Consider to set spark.rapids.sql.expression.JsonToStructs=false", e) + } + } } - } - // process duplicated field names in input struct schema - - withResource(table) { _ => - // Step 5: verify that the data looks correct - if (table.getRowCount != numRows) { - throw new IllegalStateException("The input data didn't parse correctly and we read " + - s"a different number of rows than was expected. Expected $numRows, " + + withResource(table) { _ => + // Step 4: Verify that the data looks correct + if (table.getRowCount != numRows) { + throw new IllegalStateException("The input data didn't parse correctly and " + + s"we read a different number of rows than was expected. Expected $numRows, " + s"but got ${table.getRowCount}") - } + } - // Step 7: turn the data into a Struct - withResource(convertTableToDesiredType(table, struct, parsedOptions)) { columns => - withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData => - // Step 8: put nulls back in for nulls and empty strings - withResource(GpuScalar.from(null, struct)) { nullVal => - isNullOrEmpty.ifElse(nullVal, structData) - } + // Step 5: Convert the read table into columns of desired types. + withResource(convertTableToDesiredType(table, struct, parsedOptions)) { columns => + // Step 6: Turn the data into structs. + JSONUtils.makeStructs(columns.asInstanceOf[Array[ColumnView]], + concatenated.isNullOrEmpty) } } } - } + case _ => throw new IllegalArgumentException( + s"GpuJsonToStructs currently does not support schema of type $schema.") } - case _ => throw new IllegalArgumentException( - s"GpuJsonToStructs currently does not support schema of type $schema.") } }