Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update from_json to use null as line delimiter #11499

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions integration_tests/src/main/python/json_matrix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,38 @@ def test_json_tuple_dec_locale_non_aribic(std_input_path):
lambda spark : read_json_as_text(spark, std_input_path + '/' + WITH_DEC_LOCALE_NON_ARIBIC_FILE, "json").selectExpr('''json_tuple(json, "data")'''),
conf =_enable_json_tuple_conf)

def json_df_with_whitespace(spark):
return spark.createDataFrame([
["""{"a":\n100}"""],
["""{"a":\r101}"""],
["""{"a":\t102}"""],
["""{"a": 102}"""],
["""\n"""],
["""\r"""],
["""\t"""],
[""" """],
["""\r\n\t """],
["""{"a":"\r200"}"""],
["""{"a":"\n201"}"""],
["""{"a":"\t202"}"""],
["""{"a":" 202"}"""]],
StructType([StructField("json", StringType())]))

@pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/16915')
def test_from_json_with_whitespace():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'from_json(json, "a STRING")'),
conf =_enable_json_to_structs_conf)

def test_get_json_object_with_whitespace():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'get_json_object(json, "$.a")'))

def test_json_tuple_with_whitespace():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : json_df_with_whitespace(spark).selectExpr('*', 'json_tuple(json, "a")'),
conf =_enable_json_tuple_conf)

# These are common files used by most of the tests. A few files are for specific types, but these are very targeted tests
COMMON_TEST_FILES=[
"int_formatted.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class JsonPartitionReader(
maxBytesPerChunk, execMetrics, FilterEmptyHostLineBuffererFactory) {

def buildJsonOptions(parsedOptions: JSONOptions): cudf.JSONOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)
GpuJsonReadCommon.cudfJsonOptions(parsedOptions, None)

/**
* Read the host buffer to GPU table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,23 @@ object GpuJsonReadCommon {
}
}

def cudfJsonOptions(options: JSONOptions): ai.rapids.cudf.JSONOptions = {
def cudfJsonOptions(options: JSONOptions,
delimOverride: Option[Char]): ai.rapids.cudf.JSONOptions = {
// This is really ugly, but options.allowUnquotedControlChars is marked as private
// and this is the only way I know to get it without even uglier tricks
@scala.annotation.nowarn("msg=Java enum ALLOW_UNQUOTED_CONTROL_CHARS in " +
"Java enum Feature is deprecated")
val allowUnquotedControlChars =
options.buildJsonFactory()
.isEnabled(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS)
val lineDelimiter = delimOverride.getOrElse {
options.lineSeparatorInRead.map { sep =>
if (sep.length > 1) {
throw new IllegalArgumentException("Only 1 byte separators are supported")
}
sep(0).toChar
} .getOrElse('\n')
}
ai.rapids.cudf.JSONOptions.builder()
.withRecoverWithNull(true)
.withMixedTypesAsStrings(true)
Expand All @@ -336,6 +345,7 @@ object GpuJsonReadCommon {
.withLeadingZeros(options.allowNumericLeadingZeros)
.withNonNumericNumbers(options.allowNonNumericNumbers)
.withUnquotedControlChars(allowUnquotedControlChars)
.withLineDelimiter(lineDelimiter)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, Scalar}
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, ColumnView, Cuda, DataSource, DeviceMemoryBuffer, HostMemoryBuffer, RegexProgram, Scalar}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuScalar, GpuUnaryExpression, HostAlloc}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.jni.JSONUtils
Expand Down Expand Up @@ -81,6 +81,9 @@ case class GpuJsonToStructs(
import GpuJsonReadCommon._

private lazy val emptyRowStr = constructEmptyRow(schema)
// the nul char should be very rare and make it so we never really have to worry
// about it showing up in real world data.
private val lineDelimiter = '\u0000'

private def constructEmptyRow(schema: DataType): String = {
schema match {
Expand All @@ -94,8 +97,10 @@ case class GpuJsonToStructs(
val stripped = if (input.getData == null) {
input.incRefCount
} else {
withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
// TODO is there a way to do this without a regexp
withResource(cudf.Scalar.fromString("")) { empty =>
val prog = new RegexProgram("(?:^[ \t\r\n]+)|(?:[ \t\r\n]+$)")
input.replaceRegex(prog, empty)
}
}

Expand All @@ -119,11 +124,11 @@ case class GpuJsonToStructs(
}
withResource(isLiteralNull) { _ =>
withResource(isLiteralNull.ifElse(emptyRow, nullsReplaced)) { cleaned =>
checkForNewline(cleaned, "\n", "line separator")
checkForNewline(cleaned, "\r", "carriage return")
checkForLineDelimiter(cleaned)

// add a newline to each JSON line
val withNewline = withResource(cudf.Scalar.fromString("\n")) { lineSep =>
val withNewline = withResource(
cudf.Scalar.fromString("" + lineDelimiter)) { lineSep =>
withResource(ColumnVector.fromScalar(lineSep, cleaned.getRowCount.toInt)) {
newLineCol =>
ColumnVector.stringConcatenate(Array[ColumnView](cleaned, newLineCol))
Expand All @@ -141,13 +146,13 @@ case class GpuJsonToStructs(
}
}

private def checkForNewline(cleaned: ColumnVector, newlineStr: String, name: String): Unit = {
withResource(cudf.Scalar.fromString(newlineStr)) { newline =>
withResource(cleaned.stringContains(newline)) { hasNewline =>
withResource(hasNewline.any()) { anyNewline =>
if (anyNewline.isValid && anyNewline.getBoolean) {
private def checkForLineDelimiter(cleaned: ColumnVector): Unit = {
withResource(cudf.Scalar.fromString("" + lineDelimiter)) { delimiter =>
withResource(cleaned.stringContains(delimiter)) { hasDelimiter =>
withResource(hasDelimiter.any()) { anyDelimiter =>
if (anyDelimiter.isValid && anyDelimiter.getBoolean) {
throw new IllegalArgumentException(
s"We cannot currently support parsing JSON that contains a $name in it")
s"We cannot currently support parsing JSON that contains a NUL character in it")
}
}
}
Expand All @@ -160,7 +165,7 @@ case class GpuJsonToStructs(
SQLConf.get.columnNameOfCorruptRecord)

private lazy val jsonOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)
GpuJsonReadCommon.cudfJsonOptions(parsedOptions, Some(lineDelimiter))

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
schema match {
Expand Down
Loading