Skip to content

Commit

Permalink
Fix from_json function failure when input contains empty or null st…
Browse files Browse the repository at this point in the history
…rings (#8526)

* initial fix for from_json function

Signed-off-by: Cindy Jiang <[email protected]>

* cleaned up from json tests and added one test for struct of struct

Signed-off-by: Cindy Jiang <[email protected]>

* addressed review comments

Signed-off-by: Cindy Jiang <[email protected]>

* add integration tests and workaround for lstrip and rstrip call sites

Signed-off-by: Cindy Jiang <[email protected]>

* add integration tests and fix for GpuStringTrim

Signed-off-by: Cindy Jiang <[email protected]>

* revert a fix for rstrip

Signed-off-by: Cindy Jiang <[email protected]>

* addressed review comments and updated tests

Signed-off-by: Cindy Jiang <[email protected]>

---------

Signed-off-by: Cindy Jiang <[email protected]>
  • Loading branch information
cindyyuanjiang authored Jun 9, 2023
1 parent 03576ec commit 8f967e2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 47 deletions.
61 changes: 34 additions & 27 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,41 +380,48 @@ def test_from_json_map_fallback():
'JsonToStructs',
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1234}')])
@pytest.mark.parametrize('schema', [StructType([StructField("a", StringType())]),
StructType([StructField("d", StringType())]),
StructType([StructField("a", StringType()), StructField("b", StringType())]),
StructType([StructField("c", IntegerType()), StructField("a", StringType())]),
StructType([StructField("a", StringType()), StructField("a", StringType())])
@pytest.mark.parametrize('schema', ['struct<a:string>',
'struct<d:string>',
'struct<a:string,b:string>',
'struct<c:int,a:string>',
'struct<a:string,a:string>',
])
def test_from_json_struct(data_gen, schema):
def test_from_json_struct(schema):
json_string_gen = StringGen(r'{"a": "[0-9]{0,5}", "b": "[A-Z]{0,5}", "c": 1\d\d\d}').with_special_pattern('', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": {"name": "Bob", "age": 20}}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", StructType([StructField("name", StringType()), \
StructField("age", IntegerType())]))])])
def test_from_json_struct_of_struct(data_gen, schema):
@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:struct<name:string,age:int>>',
'struct<teacher:string,student:struct<name:string,age:int>>'])
def test_from_json_struct_of_struct(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": {"name": "[A-Z]{1}[a-z]{2,5}", "age": 1\d}}').with_special_pattern('', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('data_gen', [StringGen(r'{"teacher": "Alice", "student": \[{"name": "Bob", "class": "junior"},' \
r'{"name": "Charlie", "class": "freshman"}\]}')])
@pytest.mark.parametrize('schema', [StructType([StructField("teacher", StringType())]),
StructType([StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))]),
StructType([StructField("teacher", StringType()), \
StructField("student", ArrayType(StructType([StructField("name", StringType()), \
StructField("class", StringType())])))])])
def test_from_json_struct_of_list(data_gen, schema):
@pytest.mark.parametrize('schema', ['struct<teacher:string>',
'struct<student:array<struct<name:string,class:string>>>',
'struct<teacher:string,student:array<struct<name:string,class:string>>>'])
def test_from_json_struct_of_list(schema):
json_string_gen = StringGen(r'{"teacher": "[A-Z]{1}[a-z]{2,5}",' \
r'"student": \[{"name": "[A-Z]{1}[a-z]{2,5}", "class": "junior"},' \
r'{"name": "[A-Z]{1}[a-z]{2,5}", "class": "freshman"}\]}').with_special_pattern('', weight=50)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen) \
.select(f.from_json(f.col('a'), schema)),
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('schema', ['struct<a:string>', 'struct<a:string,b:int>'])
def test_from_json_struct_all_empty_string_input(schema):
json_string_gen = StringGen('')
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@allow_non_gpu('FileSourceScanExec')
Expand Down
18 changes: 9 additions & 9 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,30 +148,30 @@ def test_contains():
f.col('a').contains(None)
))

def test_trim():
gen = mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}')
@pytest.mark.parametrize('data_gen', [mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}'), StringGen('')])
def test_trim(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
'TRIM(a)',
'TRIM("Ab" FROM a)',
'TRIM("A\ud720" FROM a)',
'TRIM(BOTH NULL FROM a)',
'TRIM("" FROM a)'))

def test_ltrim():
gen = mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}')
@pytest.mark.parametrize('data_gen', [mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}'), StringGen('')])
def test_ltrim(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
'LTRIM(a)',
'LTRIM("Ab", a)',
'TRIM(LEADING "A\ud720" FROM a)',
'TRIM(LEADING NULL FROM a)',
'TRIM(LEADING "" FROM a)'))

def test_rtrim():
gen = mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}')
@pytest.mark.parametrize('data_gen', [mk_str_gen('[Ab \ud720]{0,3}A.{0,3}Z[ Ab]{0,3}'), StringGen('')])
def test_rtrim(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
lambda spark: unary_op_df(spark, data_gen).selectExpr(
'RTRIM(a)',
'RTRIM("Ab", a)',
'TRIM(TRAILING "A\ud720" FROM a)',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,40 @@ case class GpuJsonToStructs(
timeZoneId: Option[String] = None)
extends GpuUnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes
with NullIntolerant {

private def constructEmptyRow(schema: DataType): String = {
schema match {
case struct: StructType if (struct.fields.length > 0) => {
val jsonFields: Array[String] = struct.fields.map { field =>
field.dataType match {
case IntegerType => s""""${field.name}": 0"""
case StringType => s""""${field.name}": """""
case s: StructType => s""""${field.name}": ${constructEmptyRow(s)}"""
case a: ArrayType => s""""${field.name}": ${constructEmptyRow(a)}"""
case t => throw new IllegalArgumentException("GpuJsonToStructs currently" +
s"does not support input schema with type $t.")
}
}
jsonFields.mkString("{", ", ", "}")
}
case array: ArrayType => s"[${constructEmptyRow(array.elementType)}]"
case _ => "{}"
}
}

lazy val emptyRowStr = constructEmptyRow(schema)

private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) ={
withResource(cudf.Scalar.fromString("{}")) { emptyRow =>
val stripped = withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
private def cleanAndConcat(input: cudf.ColumnVector): (cudf.ColumnVector, cudf.ColumnVector) = {
withResource(cudf.Scalar.fromString(emptyRowStr)) { emptyRow =>

val stripped = if (input.getData == null) {
input.incRefCount
} else {
withResource(cudf.Scalar.fromString(" ")) { space =>
input.strip(space)
}
}

withResource(stripped) { stripped =>
val isNullOrEmptyInput = withResource(input.isNull) { isNull =>
val isEmpty = withResource(stripped.getCharLengths) { lengths =>
Expand Down Expand Up @@ -86,14 +114,14 @@ case class GpuJsonToStructs(
// Output = [(null, StringType), ("b", StringType), ("a", IntegerType)]
private def processFieldNames(names: Seq[(String, DataType)]): Seq[(String, DataType)] = {
val zero = (Set.empty[String], Seq.empty[(String, DataType)])
val (_, res) = names.foldRight(zero) { case ((name, dtype), (existingNames, acc)) =>
val (_, resultFields) = names.foldRight (zero) { case ((name, dtype), (existingNames, acc)) =>
if (existingNames(name)) {
(existingNames, (null, dtype) +: acc)
} else {
(existingNames + name, (name, dtype) +: acc)
}
}
res
resultFields
}

// Given a cudf column, return its Spark type
Expand Down Expand Up @@ -151,8 +179,7 @@ case class GpuJsonToStructs(
}

// process duplicated field names in input struct schema
val fieldNames = processFieldNames(struct.fields.map { field =>
(field.name, field.dataType)})
val fieldNames = processFieldNames(struct.fields.map (f => (f.name, f.dataType)))

withResource(rawTable) { rawTable =>
// Step 5: verify that the data looks correct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ case class GpuStringTrim(column: Expression, trimParameters: Option[Expression]
val trimMethod = "gpuTrim"

override def strippedColumnVector(column: GpuColumnVector, t: Scalar): GpuColumnVector =
GpuColumnVector.from(column.getBase.strip(t), dataType)
if (column.getBase.getData == null) {
GpuColumnVector.from(column.getBase.incRefCount, dataType)
} else {
GpuColumnVector.from(column.getBase.strip(t), dataType)
}
}

case class GpuStringTrimLeft(column: Expression, trimParameters: Option[Expression] = None)
Expand All @@ -283,7 +287,11 @@ case class GpuStringTrimLeft(column: Expression, trimParameters: Option[Expressi
val trimMethod = "gpuTrimLeft"

override def strippedColumnVector(column: GpuColumnVector, t: Scalar): GpuColumnVector =
GpuColumnVector.from(column.getBase.lstrip(t), dataType)
if (column.getBase.getData == null) {
GpuColumnVector.from(column.getBase.incRefCount, dataType)
} else {
GpuColumnVector.from(column.getBase.lstrip(t), dataType)
}
}

case class GpuStringTrimRight(column: Expression, trimParameters: Option[Expression] = None)
Expand All @@ -303,7 +311,11 @@ case class GpuStringTrimRight(column: Expression, trimParameters: Option[Express
val trimMethod = "gpuTrimRight"

override def strippedColumnVector(column:GpuColumnVector, t:Scalar): GpuColumnVector =
GpuColumnVector.from(column.getBase.rstrip(t), dataType)
if (column.getBase.getData == null) {
GpuColumnVector.from(column.getBase.incRefCount, dataType)
} else {
GpuColumnVector.from(column.getBase.rstrip(t), dataType)
}
}

case class GpuConcatWs(children: Seq[Expression])
Expand Down

0 comments on commit 8f967e2

Please sign in to comment.