diff --git a/docs/compatibility.md b/docs/compatibility.md index 79ba3f0da66..9d411f56d50 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -375,7 +375,6 @@ with Spark, and can be enabled by setting `spark.rapids.sql.expression.StructsTo Known issues are: -- There is no support for timestamp types - There can be rounding differences when formatting floating-point numbers as strings. For example, Spark may produce `-4.1243574E26` but the GPU may produce `-4.124357351E26`. - Not all JSON options are respected diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 1566a291f36..d691b3994d2 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -14541,16 +14541,16 @@ are limited.
* Giving an input with 3 rows: @@ -932,7 +941,8 @@ object GpuCast { // to be represented by the string literal `null` val strValue = closeOnExcept(strKey) { _ => withResource(kvStructColumn.getChildColumnView(1)) { valueColumn => - val valueStr = if (valueColumn.getType == DType.STRING) { + val dt = valueColumn.getType + val valueStr = if (dt == DType.STRING || dt.isDurationType || dt.isTimestampType) { withResource(castToString(valueColumn, from.valueType, options)) { valueStr => addQuotes(valueStr, valueColumn.getRowCount.toInt) } @@ -1102,8 +1112,9 @@ object GpuCast { colon: ColumnVector, quote: ColumnVector): ColumnVector = { val jsonName = StringEscapeUtils.escapeJson(inputSchema(fieldIndex).name) - val dataType = inputSchema(fieldIndex).dataType - val needsQuoting = dataType == DataTypes.StringType + val dt = inputSchema(fieldIndex).dataType + val needsQuoting = dt == DataTypes.StringType || dt == DataTypes.DateType || + dt == DataTypes.TimestampType withResource(input.getChildColumnView(fieldIndex)) { cv => withResource(ArrayBuffer.empty[ColumnVector]) { attrColumns => // prefix with quoted column name followed by colon diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index fa9346f0ef4..633f1c046ae 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3604,20 +3604,14 @@ object GpuOverrides extends Logging { TypeSig.STRING, Seq(ParamCheck("struct", (TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT + - TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), + TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), (TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT + - TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested() + TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP + + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested() ))), - (a, conf, p, r) => new UnaryExprMeta[StructsToJson](a, conf, p, r) { - override def tagExprForGpu(): Unit = { - if (a.options.get("pretty").exists(_.equalsIgnoreCase("true"))) { - willNotWorkOnGpu("to_json option pretty=true is not supported") - } - } - - override def convertToGpu(child: Expression): GpuExpression = - GpuStructsToJson(a.options, child, a.timeZoneId) - }).disabledByDefault("to_json support is experimental. See compatibility " + + (a, conf, p, r) => new GpuStructsToJsonMeta(a, conf, p, r)) + .disabledByDefault("to_json support is experimental. See compatibility " + "guide for more information."), expr[JsonTuple]( "Returns a tuple like the function get_json_object, but it takes multiple names. " + diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala index 3e674ecb6d8..ea12a483c82 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuStructsToJson.scala @@ -17,11 +17,64 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf.ColumnVector -import com.nvidia.spark.rapids.{CastOptions, GpuCast, GpuColumnVector, GpuUnaryExpression} +import com.nvidia.spark.rapids.{CastOptions, DataFromReplacementRule, GpuCast, GpuColumnVector, GpuExpression, GpuUnaryExpression, RapidsConf, RapidsMeta, UnaryExprMeta} +import com.nvidia.spark.rapids.GpuOverrides +import com.nvidia.spark.rapids.shims.LegacyBehaviorPolicyShim -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Expression, StructsToJson} +import org.apache.spark.sql.catalyst.json.GpuJsonUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, StringType, StructType} +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.types.{DataType, DateType, StringType, StructType, TimestampType} + +class GpuStructsToJsonMeta( + expr: StructsToJson, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule + ) extends UnaryExprMeta[StructsToJson](expr, conf, parent, rule) { + + override def tagExprForGpu(): Unit = { + if (expr.options.get("pretty").exists(_.equalsIgnoreCase("true"))) { + willNotWorkOnGpu("to_json option pretty=true is not supported") + } + val options = GpuJsonUtils.parseJSONOptions(expr.options) + val hasDates = TrampolineUtil.dataTypeExistsRecursively(expr.child.dataType, + _.isInstanceOf[DateType]) + if (hasDates) { + GpuJsonUtils.dateFormatInWrite(options) match { + case "yyyy-MM-dd" => + case dateFormat => + // we can likely support other formats but we would need to add tests + // tracking issue is https://github.com/NVIDIA/spark-rapids/issues/9602 + willNotWorkOnGpu(s"Unsupported dateFormat '$dateFormat' in to_json") + } + } + val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(expr.child.dataType, + _.isInstanceOf[TimestampType]) + if (hasTimestamps) { + GpuJsonUtils.timestampFormatInWrite(options) match { + case "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]" => + case timestampFormat => + // we can likely support other formats but we would need to add tests + // tracking issue is https://github.com/NVIDIA/spark-rapids/issues/9602 + willNotWorkOnGpu(s"Unsupported timestampFormat '$timestampFormat' in to_json") + } + if (options.zoneId.normalized() != GpuOverrides.UTC_TIMEZONE_ID) { + // we hard-code the timezone `Z` in GpuCast.castTimestampToJson + // so we need to fall back if expr different timeZone is specified + willNotWorkOnGpu(s"Unsupported timeZone '${options.zoneId}' in to_json") + } + } + + if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy) { + willNotWorkOnGpu("LEGACY timeParserPolicy is not supported in GpuJsonToStructs") + } + } + + override def convertToGpu(child: Expression): GpuExpression = + GpuStructsToJson(expr.options, child, expr.timeZoneId) +} case class GpuStructsToJson( options: Map[String, String], diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 7b7b680db24..ce5ecff513c 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -48,8 +48,22 @@ object GpuJsonUtils { optionalTimestampFormatInRead(parseJSONReadOptions(options)) def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat + + def dateFormatInWrite(options: JSONOptions): String = + options.dateFormat + + def timestampFormatInWrite(options: JSONOptions): String = + options.timestampFormat + def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false + def parseJSONOptions(options: Map[String, String]) = { + new JSONOptions( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } + def parseJSONReadOptions(options: Map[String, String]) = { new JSONOptionsInRead( options, diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 5f1d8929887..68c3996131a 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -49,8 +49,22 @@ object GpuJsonUtils { s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) + def dateFormatInWrite(options: JSONOptions): String = + options.dateFormatInWrite + + def timestampFormatInWrite(options: JSONOptions): String = + options.timestampFormatInWrite + def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false + def parseJSONOptions(options: Map[String, String]) = { + new JSONOptions( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } + + def parseJSONReadOptions(options: Map[String, String]) = { new JSONOptionsInRead( options, diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 33989821009..e132ee94baf 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -56,8 +56,22 @@ object GpuJsonUtils { s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) + def dateFormatInWrite(options: JSONOptions): String = + options.dateFormatInWrite + + def timestampFormatInWrite(options: JSONOptions): String = + options.timestampFormatInWrite + def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false + def parseJSONOptions(options: Map[String, String]) = { + new JSONOptions( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } + + def parseJSONReadOptions(options: Map[String, String]) = { new JSONOptionsInRead( options, diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala index 4685cc0d289..afa0c58ea45 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/catalyst/json/GpuJsonUtils.scala @@ -60,9 +60,22 @@ object GpuJsonUtils { s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) + def dateFormatInWrite(options: JSONOptions): String = + options.dateFormatInWrite + + def timestampFormatInWrite(options: JSONOptions): String = + options.timestampFormatInWrite + def enableDateTimeParsingFallback(options: JSONOptions): Boolean = options.enableDateTimeParsingFallback.getOrElse(false) + def parseJSONOptions(options: Map[String, String]) = { + new JSONOptions( + options, + SQLConf.get.sessionLocalTimeZone, + SQLConf.get.columnNameOfCorruptRecord) + } + def parseJSONReadOptions(options: Map[String, String]) = { new JSONOptionsInRead( options, diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index 14b2f8f784d..033850a7713 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -539,7 +539,7 @@ StringTrimLeft,S,`ltrim`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA, StringTrimRight,S,`rtrim`,None,project,src,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA StringTrimRight,S,`rtrim`,None,project,trimStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA StringTrimRight,S,`rtrim`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA -StructsToJson,NS,`to_json`,This is disabled by default because to_json support is experimental. See compatibility guide for more information.,project,struct,S,S,S,S,S,S,S,NA,NA,S,NA,NA,NA,NA,S,S,S,NA +StructsToJson,NS,`to_json`,This is disabled by default because to_json support is experimental. See compatibility guide for more information.,project,struct,S,S,S,S,S,S,S,S,PS,S,NA,NA,NA,NA,PS,PS,PS,NA StructsToJson,NS,`to_json`,This is disabled by default because to_json support is experimental. See compatibility guide for more information.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA Substring,S,`substr`; `substring`,None,project,str,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,NA Substring,S,`substr`; `substring`,None,project,pos,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA