Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix from_json function failure when input contains empty or null strings #8526

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,41 +380,48 @@ def test_from_json_map_fallback():
'JsonToStructs',
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1234}')])
@pytest.mark.parametrize('schema', [StructType([StructField("a", StringType())]),
StructType([StructField("d", StringType())]),
StructType([StructField("a", StringType()), StructField("b", StringType())]),
StructType([StructField("c", IntegerType()), StructField("a", StringType())]),
StructType([StructField("a", StringType()), StructField("a", StringType())])
@pytest.mark.parametrize('schema', ['struct<a:string>',
'struct<d:string>',
'struct<a:string,b:string>',
'struct<c:int,a:string>',
'struct<a:string,a:string>',
])
def test_from_json_struct(data_gen, schema):
def test_from_json_struct(schema):
json_string_gen = StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1\d\d\d}').with_special_pattern('', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": {"name": "Bob", "age": 20}}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", StructType([StructField("name", StringType()), \
StructField("age", IntegerType())]))])])
def test_from_json_struct_of_struct(data_gen, schema):
@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:struct<name:string,age:int>>',
'struct<teacher:string,student:struct<name:string,age:int>>'])
def test_from_json_struct_of_struct(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}').with_special_pattern('', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": \[{"name": "Bob", "class": "junior"},' \
r'{"name": "Charlie", "class": "freshman"}\]}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))]),
StructType([StructField("teacher", StringType()), \
StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))])])
def test_from_json_struct_of_list(data_gen, schema):
@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:array<struct<name:string,class:string>>>',
'struct<teacher:string,student:array<struct<name:string,class:string>>>'])
def test_from_json_struct_of_list(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": \[{"name": "[A-Z]{1}[a-z]{2,5}", "class": "junior"},' \
r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}').with_special_pattern('', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('schema', ['struct<a:string>', 'struct<a:string,b:int>'])
def test_from_json_struct_all_empty_string_input(schema):
json_string_gen = StringGen('')
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@allow_non_gpu('FileSourceScanExec')
Expand Down
18 changes: 9 additions & 9 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,30 +148,30 @@ def test_contains():
f.col('a').contains(None)
))

def test_trim():
gen = mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}')
@pytest.mark.parametrize('data_gen', [mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}'), StringGen('')])
def test_trim(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
'TRIM(a)',
'TRIM("Ab" FROM a)',
'TRIM("A\ud720" FROM a)',
'TRIM(BOTH NULL FROM a)',
'TRIM("" FROM a)'))

def test_ltrim():
gen = mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}')
@pytest.mark.parametrize('data_gen', [mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}'), StringGen('')])
def test_ltrim(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
'LTRIM(a)',
'LTRIM("Ab", a)',
'TRIM(LEADING "A\ud720" FROM a)',
'TRIM(LEADING NULL FROM a)',
'TRIM(LEADING "" FROM a)'))

def test_rtrim():
gen = mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}')
@pytest.mark.parametrize('data_gen', [mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}'), StringGen('')])
def test_rtrim(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
'RTRIM(a)',
'RTRIM("Ab", a)',
'TRIM(TRAILING "A\ud720" FROM a)',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,40 @@ case class GpuJsonToStructs(
timeZoneId: Option[String] = None)
extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
with NullIntolerant {

private def constructEmptyRow(schema: DataType): String = {
schema match {
case struct: StructType if (struct.fields.length > 0) => {
val jsonFields: Array[String] = struct.fields.map { field =>
field.dataType match {
case IntegerType => s""""${field.name}": 0"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, does Spark really return a 0 and not a null for an empty int column???

What happens if a field name has odd characters in it? Can it have a " for example?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not certain about Spark side. I will investigate more. For odd characters, I think it would fail at readJSON parsing step. I will follow up with this in #8542. Thank you so much!

case StringType => s""""${field.name}": """""
case s: StructType => s""""${field.name}": ${constructEmptyRow(s)}"""
case a: ArrayType => s""""${field.name}": ${constructEmptyRow(a)}"""
case t => throw new IllegalArgumentException("GpuJsonToStructs currently" +
s"does not support input schema with type $t.")
}
}
jsonFields.mkString("{", ", ", "}")
}
case array: ArrayType => s"[${constructEmptyRow(array.elementType)}]"
case _ => "{}"
}
}

lazy val emptyRowStr = constructEmptyRow(schema)

private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) ={
withResource(cudf.Scalar.fromString("{}")) { emptyRow =>
val stripped = withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) = {
withResource(cudf.Scalar.fromString(emptyRowStr)) { emptyRow =>

val stripped = if (input.getData == null) {
input.incRefCount
} else {
withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we look for other places that strip, rstrip, and lstrip are called? I did a quick look and I think we might get the same problem on


GpuColumnVector.from(column.getBase.rstrip(t), dataType)

GpuColumnVector.from(column.getBase.lstrip(t), dataType)

But there may be others.

At a minimum we need to test these too and fix them if they are also broken

Copy link
Collaborator Author

@cindyyuanjiang cindyyuanjiang Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 Thanks! I tested GpuStringTrim[Left, Right] implementation above. The same problem comes up. I added integration tests and fixes for them.


I didn't find tests for the above rstrip() call site, and haven't added a fix yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is fine. That is specifically for a special case ORC char type. We should file an issue for it just so it is not missed, but it should be really rare.

}
}

withResource(stripped) { stripped =>
val isNullOrEmptyInput = withResource(input.isNull) { isNull =>
val isEmpty = withResource(stripped.getCharLengths) { lengths =>
Expand Down Expand Up @@ -86,14 +114,14 @@ case class GpuJsonToStructs(
// Output = [(null, StringType), ("b", StringType), ("a", IntegerType)]
private def processFieldNames(names: Seq[(String, DataType)]): Seq[(String, DataType)] = {
val zero = (Set.empty[String], Seq.empty[(String, DataType)])
val (_, res) = names.foldRight(zero) { case ((name, dtype), (existingNames, acc)) =>
val (_, resultFields) = names.foldRight (zero) { case ((name, dtype), (existingNames, acc)) =>
if (existingNames(name)) {
(existingNames, (null, dtype) +: acc)
} else {
(existingNames + name, (name, dtype) +: acc)
}
}
res
resultFields
}

// Given a cudf column, return its Spark type
Expand Down Expand Up @@ -151,8 +179,7 @@ case class GpuJsonToStructs(
}

// process duplicated field names in input struct schema
val fieldNames = processFieldNames(struct.fields.map { field =>
(field.name, field.dataType)})
val fieldNames = processFieldNames(struct.fields.map (f => (f.name, f.dataType)))

withResource(rawTable) { rawTable =>
// Step 5: verify that the data looks correct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ case class GpuStringTrim(column: Expression, trimParameters: Option[Expression]
val trimMethod = "gpuTrim"

override def strippedColumnVector(column: GpuColumnVector, t: Scalar): GpuColumnVector =
GpuColumnVector.from(column.getBase.strip(t), dataType)
if (column.getBase.getData == null) {
GpuColumnVector.from(column.getBase.incRefCount, dataType)
} else {
GpuColumnVector.from(column.getBase.strip(t), dataType)
}
}

case class GpuStringTrimLeft(column: Expression, trimParameters: Option[Expression] = None)
Expand All @@ -283,7 +287,11 @@ case class GpuStringTrimLeft(column: Expression, trimParameters: Option[Expressi
val trimMethod = "gpuTrimLeft"

override def strippedColumnVector(column: GpuColumnVector, t: Scalar): GpuColumnVector =
GpuColumnVector.from(column.getBase.lstrip(t), dataType)
if (column.getBase.getData == null) {
GpuColumnVector.from(column.getBase.incRefCount, dataType)
} else {
GpuColumnVector.from(column.getBase.lstrip(t), dataType)
}
}

case class GpuStringTrimRight(column: Expression, trimParameters: Option[Expression] = None)
Expand All @@ -303,7 +311,11 @@ case class GpuStringTrimRight(column: Expression, trimParameters: Option[Express
val trimMethod = "gpuTrimRight"

override def strippedColumnVector(column:GpuColumnVector, t:Scalar): GpuColumnVector =
GpuColumnVector.from(column.getBase.rstrip(t), dataType)
if (column.getBase.getData == null) {
GpuColumnVector.from(column.getBase.incRefCount, dataType)
} else {
GpuColumnVector.from(column.getBase.rstrip(t), dataType)
}
}

case class GpuConcatWs(children: Seq[Expression])
Expand Down