Skip to content

Commit

Permalink
Adopt JSONUtils.concatenateJsonStrings for concatenating JSON strin…
Browse files Browse the repository at this point in the history
…gs (#11549)

* Using the new `concat_json` JNI function

Signed-off-by: Nghia Truong <[email protected]>

* Cleanup

Signed-off-by: Nghia Truong <[email protected]>

* Cleanup

Signed-off-by: Nghia Truong <[email protected]>

* Fix style

Signed-off-by: Nghia Truong <[email protected]>

* Add test

Signed-off-by: Nghia Truong <[email protected]>

* Fix null replacement order

Signed-off-by: Nghia Truong <[email protected]>

* Update test

Signed-off-by: Nghia Truong <[email protected]>

* Rename variable

Signed-off-by: Nghia Truong <[email protected]>

* Update test

Signed-off-by: Nghia Truong <[email protected]>

* Use `JSONUtils.makeStructs`

Signed-off-by: Nghia Truong <[email protected]>

* Add NvtxRange

Signed-off-by: Nghia Truong <[email protected]>

* Fix style

Signed-off-by: Nghia Truong <[email protected]>

* Update test

Signed-off-by: Nghia Truong <[email protected]>

---------

Signed-off-by: Nghia Truong <[email protected]>
  • Loading branch information
ttnghia authored Oct 15, 2024
1 parent 11964ae commit 0510a78
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 141 deletions.
8 changes: 8 additions & 0 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,3 +1454,11 @@ def test_spark_from_json_invalid_json():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.createDataFrame(data, 'json STRING').select(f.col('json'), f.from_json(f.col('json'), schema)),
conf =_enable_all_types_conf)

@allow_non_gpu(*non_utc_allow)
def test_from_json_input_wrapped_in_whitespaces():
json_string_gen = StringGen(r'[ \r\n\t]{0,5}({"key":( |\r|\n|\t|)"[A-Za-z]{0,5}"}|null|invalid|)[ \r\n\t]{0,5}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', 'struct<key:string>')),
conf=_enable_all_types_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids

import java.util.Locale

import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar, Schema, Table}
import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, NvtxColor, NvtxRange, Scalar, Schema, Table}
import com.fasterxml.jackson.core.JsonParser
import com.nvidia.spark.rapids.{ColumnCastUtil, GpuCast, GpuColumnVector, GpuScalar, GpuTextBasedPartitionReader}
import com.nvidia.spark.rapids.Arm.withResource
Expand Down Expand Up @@ -311,21 +311,26 @@ object GpuJsonReadCommon {
def convertTableToDesiredType(table: Table,
desired: StructType,
options: JSONOptions): Array[ColumnVector] = {
val dataTypes = desired.fields.map(_.dataType)
dataTypes.zipWithIndex.safeMap {
case (dt, i) =>
convertToDesiredType(table.getColumn(i), dt, options)
withResource(new NvtxRange("convertTableToDesiredType", NvtxColor.RED)) { _ =>
val dataTypes = desired.fields.map(_.dataType)
dataTypes.zipWithIndex.safeMap {
case (dt, i) =>
convertToDesiredType(table.getColumn(i), dt, options)
}
}
}

def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = {
def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions =
cudfJsonOptionBuilder(options).build()

def cudfJsonOptionBuilder(options: JSONOptions): ai.rapids.cudf.JSONOptions.Builder = {
// This is really ugly, but options.allowUnquotedControlChars is marked as private
// and this is the only way I know to get it without even uglier tricks
@scala.annotation.nowarn("msg=Java enum ALLOW_UNQUOTED_CONTROL_CHARS in " +
"Java enum Feature is deprecated")
val allowUnquotedControlChars =
options.buildJsonFactory()
.isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS)
val allowUnquotedControlChars = options.buildJsonFactory()
.isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS)

ai.rapids.cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(true)
Expand All @@ -338,6 +343,5 @@ object GpuJsonReadCommon {
.withUnquotedControlChars(allowUnquotedControlChars)
.withCudfPruneSchema(true)
.withExperimental(true)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, Scalar}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression, HostAlloc}
import ai.rapids.cudf.{ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression, HostAlloc}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.jni.JSONUtils
import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.json.JSONOptions
Expand All @@ -34,8 +33,7 @@ import org.apache.spark.sql.types._
*/
class JsonParsingException(s: String, cause: Throwable) extends RuntimeException(s, cause) {}

class JsonDeviceDataSource(combined: ColumnVector) extends DataSource {
lazy val data: BaseDeviceMemoryBuffer = combined.getData
class JsonDeviceDataSource(data: DeviceMemoryBuffer) extends DataSource {
lazy val totalSize: Long = data.getLength
override def size(): Long = totalSize

Expand Down Expand Up @@ -64,11 +62,6 @@ class JsonDeviceDataSource(combined: ColumnVector) extends DataSource {
dest.copyFromDeviceBufferAsync(0, data, offset, length, stream)
length
}

override def close(): Unit = {
combined.close()
super.close()
}
}

case class GpuJsonToStructs(
Expand All @@ -80,142 +73,67 @@ case class GpuJsonToStructs(
with NullIntolerant {
import GpuJsonReadCommon._

private lazy val emptyRowStr = constructEmptyRow(schema)

private def constructEmptyRow(schema: DataType): String = {
schema match {
case struct: StructType if struct.fields.nonEmpty =>
s"""{"${StringEscapeUtils.escapeJson(struct.head.name)}":null}"""
case other =>
throw new IllegalArgumentException(s"$other is not supported as a top level type") }
}

private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) = {
val stripped = if (input.getData == null) {
input.incRefCount
} else {
withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
}
}

withResource(stripped) { stripped =>
val isEmpty = withResource(stripped.getByteCount) { lengths =>
withResource(cudf.Scalar.fromInt(0)) { zero =>
lengths.lessOrEqualTo(zero)
}
}
val isNullOrEmptyInput = withResource(isEmpty) { _ =>
withResource(input.isNull) { isNull =>
isNull.binaryOp(cudf.BinaryOp.NULL_LOGICAL_OR, isEmpty, cudf.DType.BOOL8)
}
}
closeOnExcept(isNullOrEmptyInput) { _ =>
withResource(cudf.Scalar.fromString(emptyRowStr)) { emptyRow =>
// TODO is it worth checking if any are empty or null and then skipping this?
withResource(isNullOrEmptyInput.ifElse(emptyRow, stripped)) { nullsReplaced =>
val isLiteralNull = withResource(Scalar.fromString("null")) { literalNull =>
nullsReplaced.equalTo(literalNull)
}
withResource(isLiteralNull) { _ =>
withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned =>
checkForNewline(cleaned, "\n", "line separator")
checkForNewline(cleaned, "\r", "carriage return")

// add a newline to each JSON line
val withNewline = withResource(cudf.Scalar.fromString("\n")) { lineSep =>
withResource(ColumnVector.fromScalar(lineSep, cleaned.getRowCount.toInt)) {
newLineCol =>
ColumnVector.stringConcatenate(Array[ColumnView](cleaned, newLineCol))
}
}

// We technically don't need to join the strings together as we just want the buffer
// which should be the same either way.
(isNullOrEmptyInput, withNewline)
}
}
}
}
}
}
}

private def checkForNewline(cleaned: ColumnVector, newlineStr: String, name: String): Unit = {
withResource(cudf.Scalar.fromString(newlineStr)) { newline =>
withResource(cleaned.stringContains(newline)) { hasNewline =>
withResource(hasNewline.any()) { anyNewline =>
if (anyNewline.isValid && anyNewline.getBoolean) {
throw new IllegalArgumentException(
s"We cannot currently support parsing JSON that contains a $name in it")
}
}
}
}
}

private lazy val parsedOptions = new JSONOptions(
options,
timeZoneId.get,
SQLConf.get.columnNameOfCorruptRecord)

private lazy val jsonOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)
private lazy val jsonOptionBuilder =
GpuJsonReadCommon.cudfJsonOptionBuilder(parsedOptions)

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
schema match {
case _: MapType =>
JSONUtils.extractRawMapFromJsonString(input.getBase)
case struct: StructType => {
// if we ever need to support duplicate keys we need to keep track of the duplicates
// and make the first one null, but I don't think this will ever happen in practice
val cudfSchema = makeSchema(struct)

// We cannot handle all corner cases with this right now. The parser just isn't
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: verify and preprocess the data to clean it up and normalize a few things
// Step 2: Concat the data into a single buffer
val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
withResource(isNullOrEmpty) { isNullOrEmpty =>
// Step 3: setup a datasource
val table = withResource(new JsonDeviceDataSource(combined)) { ds =>
// Step 4: Have cudf parse the JSON data
try {
cudf.Table.readJSON(cudfSchema, jsonOptions, ds, numRows)
} catch {
case e : RuntimeException =>
throw new JsonParsingException("Currently some Json to Struct cases " +
"are not supported. Consider to set spark.rapids.sql.expression.JsonToStructs" +
"=false", e)
withResource(new NvtxRange("GpuJsonToStructs", NvtxColor.YELLOW)) { _ =>
schema match {
case _: MapType => JSONUtils.extractRawMapFromJsonString(input.getBase)
case struct: StructType =>
// if we ever need to support duplicate keys we need to keep track of the duplicates
// and make the first one null, but I don't think this will ever happen in practice
val cudfSchema = makeSchema(struct)

// We cannot handle all corner cases with this right now. The parser just isn't
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: Concat the data into a single buffer, with verifying nulls/empty strings
val concatenated = JSONUtils.concatenateJsonStrings(input.getBase)
withResource(concatenated) { _ =>
// Step 2: Setup a datasource from the concatenated JSON strings
val table = withResource(new JsonDeviceDataSource(concatenated.data)) { ds =>
withResource(new NvtxRange("Table.readJSON", NvtxColor.RED)) { _ =>
// Step 3: Have cudf parse the JSON data
try {
cudf.Table.readJSON(cudfSchema,
jsonOptionBuilder.withLineDelimiter(concatenated.delimiter).build(),
ds,
numRows)
} catch {
case e: RuntimeException =>
throw new JsonParsingException("Currently some JsonToStructs cases " +
"are not supported. " +
"Consider to set spark.rapids.sql.expression.JsonToStructs=false", e)
}
}
}
}

// process duplicated field names in input struct schema

withResource(table) { _ =>
// Step 5: verify that the data looks correct
if (table.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly and we read " +
s"a different number of rows than was expected. Expected $numRows, " +
withResource(table) { _ =>
// Step 4: Verify that the data looks correct
if (table.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly and " +
s"we read a different number of rows than was expected. Expected $numRows, " +
s"but got ${table.getRowCount}")
}
}

// Step 7: turn the data into a Struct
withResource(convertTableToDesiredType(table, struct, parsedOptions)) { columns =>
withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData =>
// Step 8: put nulls back in for nulls and empty strings
withResource(GpuScalar.from(null, struct)) { nullVal =>
isNullOrEmpty.ifElse(nullVal, structData)
}
// Step 5: Convert the read table into columns of desired types.
withResource(convertTableToDesiredType(table, struct, parsedOptions)) { columns =>
// Step 6: Turn the data into structs.
JSONUtils.makeStructs(columns.asInstanceOf[Array[ColumnView]],
concatenated.isNullOrEmpty)
}
}
}
}
case _ => throw new IllegalArgumentException(
s"GpuJsonToStructs currently does not support schema of type $schema.")
}
case _ => throw new IllegalArgumentException(
s"GpuJsonToStructs currently does not support schema of type $schema.")
}
}

Expand Down

0 comments on commit 0510a78

Please sign in to comment.