Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GpuJsonToStructs to use the new JNI kernel when the input schema is StructType #11348

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import org.apache.spark.sql.types.{DataType, _}
* ScanJson
*/
object GpuJsonReadCommon {
private def populateSchema(dt: DataType,
name: String, builder: Schema.Builder): Unit = dt match {
private def populateSchema(dt: DataType, name: String, builder: Schema.Builder): Unit =
dt match {
case at: ArrayType =>
val child = builder.addColumn(DType.LIST, name)
populateSchema(at.elementType, "element", child)
Expand All @@ -47,6 +47,9 @@ object GpuJsonReadCommon {
}
case _: MapType =>
throw new IllegalArgumentException("MapType is not supported yet for schema conversion")
case ShortType | IntegerType | LongType | FloatType | DoubleType | ByteType | BooleanType |
DateType | TimestampType | _: DecimalType =>
builder.addColumn(GpuColumnVector.getNonNestedRapidsType(dt), name)
case _ =>
builder.addColumn(DType.STRING, name)
}
Expand Down Expand Up @@ -90,6 +93,8 @@ object GpuJsonReadCommon {
}

private def undoKeepQuotes(input: ColumnView): ColumnVector = {
// TableDebug.get.debug("input", input)

withResource(isQuotedString(input)) { iq =>
withResource(stripFirstAndLastChar(input)) { stripped =>
iq.ifElse(stripped, input)
Expand Down Expand Up @@ -266,7 +271,8 @@ object GpuJsonReadCommon {

private def convertToDesiredType(inputCv: ColumnVector,
topLevelType: DataType,
options: JSONOptions): ColumnVector = {
options: JSONOptions,
removeQuotes: Boolean): ColumnVector = {
ColumnCastUtil.deepTransform(inputCv, Some(topLevelType),
Some(nestedColumnViewMismatchTransform)) {
case (cv, Some(BooleanType)) if cv.getType == DType.STRING =>
Expand All @@ -281,8 +287,8 @@ object GpuJsonReadCommon {
GpuTextBasedPartitionReader.castStringToTimestamp(fixed, timestampFormat(options),
DType.TIMESTAMP_MICROSECONDS)
}
case (cv, Some(StringType)) if cv.getType == DType.STRING =>
undoKeepQuotes(cv)
case (cv, Some(StringType)) if cv.getType == DType.STRING && removeQuotes =>
undoKeepQuotes(cv)
case (cv, Some(dt: DecimalType)) if cv.getType == DType.STRING =>
withResource(sanitizeDecimal(cv, options)) { tmp =>
castStringToDecimal(tmp, dt)
Expand Down Expand Up @@ -310,11 +316,12 @@ object GpuJsonReadCommon {
*/
def convertTableToDesiredType(table: Table,
desired: StructType,
options: JSONOptions): Array[ColumnVector] = {
options: JSONOptions,
removeQuotes: Boolean = true): Array[ColumnVector] = {
val dataTypes = desired.fields.map(_.dataType)
dataTypes.zipWithIndex.safeMap {
case (dt, i) =>
convertToDesiredType(table.getColumn(i), dt, options)
convertToDesiredType(table.getColumn(i), dt, options, removeQuotes)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,58 +162,106 @@ case class GpuJsonToStructs(
private lazy val jsonOptions =
GpuJsonReadCommon.cudfJsonOptions(parsedOptions)

private def hasArrayOfArray(dt: DataType, foundArray: Boolean = false): Boolean =
dt match {
case at: ArrayType =>
if(foundArray) {
true // return true only when ArrayType is found twice
} else {
hasArrayOfArray(at.elementType, foundArray = true)
}
case st: StructType =>
st.fields.exists(f => hasArrayOfArray(f.dataType, foundArray))
case _ => false
}

override protected def doColumnar(input: GpuColumnVector): cudf.ColumnVector = {
schema match {
case _: MapType =>
JSONUtils.extractRawMapFromJsonString(input.getBase)
case struct: StructType => {
case struct: StructType =>
// if we ever need to support duplicate keys we need to keep track of the duplicates
// and make the first one null, but I don't think this will ever happen in practice
val cudfSchema = makeSchema(struct)

// We cannot handle all corner cases with this right now. The parser just isn't
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: verify and preprocess the data to clean it up and normalize a few things
// Step 2: Concat the data into a single buffer
val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
withResource(isNullOrEmpty) { isNullOrEmpty =>
// Step 3: setup a datasource
val table = withResource(new JsonDeviceDataSource(combined)) { ds =>
// Step 4: Have cudf parse the JSON data
try {
cudf.Table.readJSON(cudfSchema, jsonOptions, ds, numRows)
} catch {
case e : RuntimeException =>
throw new JsonParsingException("Currently some Json to Struct cases " +
"are not supported. Consider to set spark.rapids.sql.expression.JsonToStructs" +
"=false", e)
if (hasArrayOfArray(struct)) {
// We cannot handle all corner cases with this right now. The parser just isn't
// good enough, but we will try to handle a few common ones.
val numRows = input.getRowCount.toInt

// Step 1: verify and preprocess the data to clean it up and normalize a few things
// Step 2: Concat the data into a single buffer
val (isNullOrEmpty, combined) = cleanAndConcat(input.getBase)
withResource(isNullOrEmpty) { isNullOrEmpty =>
// Step 3: setup a datasource
val table = withResource(new JsonDeviceDataSource(combined)) { ds =>
// Step 4: Have cudf parse the JSON data
try {
cudf.Table.readJSON(cudfSchema, jsonOptions, ds, numRows)
} catch {
case e: RuntimeException =>
throw new JsonParsingException("Currently some Json to Struct cases " +
"are not supported. Consider to set spark.rapids.sql.expression.JsonToStructs" +
"=false", e)
}
}

// process duplicated field names in input struct schema

// TableDebug.get.debug("table", table)

withResource(table) { _ =>
// Step 5: verify that the data looks correct
if (table.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly " +
"and we read a different number of rows than was expected. " +
s"Expected $numRows, but got ${table.getRowCount}")
}

// Step 6: turn the data into a Struct
withResource(convertTableToDesiredType(table, struct, parsedOptions)) { columns =>
withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData =>
// TableDebug.get.debug("structData", structData)
// Step 7: put nulls back in for nulls and empty strings
withResource(GpuScalar.from(null, struct)) { nullVal =>
val out = isNullOrEmpty.ifElse(nullVal, structData)
// TableDebug.get.debug("out", out)
out
}
}
}
}
}
} else {
//System.out.println("cudfSchema.getFlattenedTypes does not contain LIST")
val table = JSONUtils.fromJsonToStructs(input.getBase, cudfSchema,
jsonOptions.leadingZerosAllowed, jsonOptions.nonNumericNumbersAllowed,
jsonOptions.unquotedControlChars)
// TableDebug.get.debug("input.getBase", input.getBase)
// TableDebug.get.debug("table from json", table)


// process duplicated field names in input struct schema

withResource(table) { _ =>
// Step 5: verify that the data looks correct
if (table.getRowCount != numRows) {
throw new IllegalStateException("The input data didn't parse correctly and we read " +
s"a different number of rows than was expected. Expected $numRows, " +
s"but got ${table.getRowCount}")
val convertedStructs =
withResource(table) { _ =>
withResource(convertTableToDesiredType(table, struct, parsedOptions,
removeQuotes = false)) {
columns => cudf.ColumnVector.makeStruct(columns: _*)
}
}

// Step 7: turn the data into a Struct
withResource(convertTableToDesiredType(table, struct, parsedOptions)) { columns =>
withResource(cudf.ColumnVector.makeStruct(columns: _*)) { structData =>
// Step 8: put nulls back in for nulls and empty strings
withResource(GpuScalar.from(null, struct)) { nullVal =>
isNullOrEmpty.ifElse(nullVal, structData)
}
// TableDebug.get.debug("convertedStructs", convertedStructs)

withResource(convertedStructs) { converted =>
withResource(JSONUtils.isNullOrEmpty(input.getBase)) { isNullOrEmpty =>
withResource(GpuScalar.from(null, struct)) { nullVal =>
val out = isNullOrEmpty.ifElse(nullVal, converted)
// TableDebug.get.debug("out from json", out)
out
}
}
}
}
}
case _ => throw new IllegalArgumentException(
s"GpuJsonToStructs currently does not support schema of type $schema.")
}
Expand Down
Loading