Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add date and timestamp support to to_json [databricks] #9600

Merged
merged 26 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ with Spark, and can be enabled by setting `spark.rapids.sql.expression.StructsTo

Known issues are:

- There is no support for timestamp types
- There can be rounding differences when formatting floating-point numbers as strings. For example, Spark may
produce `-4.1243574E26` but the GPU may produce `-4.124357351E26`.
- Not all JSON options are respected
Expand Down
10 changes: 5 additions & 5 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -14541,16 +14541,16 @@ are limited.
<td>S</td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td> </td>
</tr>
<tr>
Expand Down
10 changes: 6 additions & 4 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,8 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
long_gen,
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9350')),
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9350')),
pytest.param(date_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9515')),
pytest.param(timestamp_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9515')),
date_gen,
timestamp_gen,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some fallback tests for the options timestampFormat, dateFormat, and if "spark.sql.legacy.timeParserPolicy" is set to LEGACY? At least from the code all of these impact the format that is used to write the JSON output.

StringGen('[A-Za-z0-9\r\n\'"\\\\]{0,10}', nullable=True) \
.with_special_case('\u1f600') \
.with_special_case('"a"') \
Expand All @@ -628,7 +628,8 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
pytest.param(True, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9517')),
False
])
def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty):
@pytest.mark.parametrize('timezone', ['UTC', 'Etc/UTC'])
def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty, timezone):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a fallback test for non-utc?

struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
Expand All @@ -640,7 +641,8 @@ def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty):
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'ignoreNullFields': ignore_null_fields,
'pretty': pretty }
'pretty': pretty,
'timeZone': timezone}

def struct_to_json(spark):
df = gen_df(spark, gen)
Expand Down
17 changes: 14 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ object GpuCast {
fromDataType: DataType, options: CastOptions): ColumnVector = fromDataType match {
case StringType => input.copyToColumnVector()
case DateType => input.asStrings("%Y-%m-%d")
case TimestampType if options.castToJsonString => castTimestampToJson(input)
case TimestampType => castTimestampToString(input)
case FloatType | DoubleType => castFloatingTypeToString(input)
case BinaryType => castBinToString(input, options)
Expand Down Expand Up @@ -773,6 +774,14 @@ object GpuCast {
}
}

private def castTimestampToJson(input: ColumnView): ColumnVector = {
// we fall back to CPU if the JSON timezone is not UTC, so it is safe
// to hard-code `Z` here for now, but we should really add a timestamp
// format to CastOptions when we add support for custom formats in
// https://github.com/NVIDIA/spark-rapids/issues/9602
input.asStrings("%Y-%m-%dT%H:%M:%S.%3fZ")
}

/**
* A 5 steps solution for concatenating string array column. <p>
* Giving an input with 3 rows:
Expand Down Expand Up @@ -932,7 +941,8 @@ object GpuCast {
// to be represented by the string literal `null`
val strValue = closeOnExcept(strKey) { _ =>
withResource(kvStructColumn.getChildColumnView(1)) { valueColumn =>
val valueStr = if (valueColumn.getType == DType.STRING) {
val dt = valueColumn.getType
val valueStr = if (dt == DType.STRING || dt.isDurationType || dt.isTimestampType) {
withResource(castToString(valueColumn, from.valueType, options)) { valueStr =>
addQuotes(valueStr, valueColumn.getRowCount.toInt)
}
Expand Down Expand Up @@ -1102,8 +1112,9 @@ object GpuCast {
colon: ColumnVector,
quote: ColumnVector): ColumnVector = {
val jsonName = StringEscapeUtils.escapeJson(inputSchema(fieldIndex).name)
val dataType = inputSchema(fieldIndex).dataType
val needsQuoting = dataType == DataTypes.StringType
val dt = inputSchema(fieldIndex).dataType
val needsQuoting = dt == DataTypes.StringType || dt == DataTypes.DateType ||
dt == DataTypes.TimestampType
withResource(input.getChildColumnView(fieldIndex)) { cv =>
withResource(ArrayBuffer.empty[ColumnVector]) { attrColumns =>
// prefix with quoted column name followed by colon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3598,15 +3598,47 @@ object GpuOverrides extends Logging {
TypeSig.STRING,
Seq(ParamCheck("struct",
(TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT +
TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP +
TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
(TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT +
TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested()
TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP +
TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested()
))),
(a, conf, p, r) => new UnaryExprMeta[StructsToJson](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (a.options.get("pretty").exists(_.equalsIgnoreCase("true"))) {
willNotWorkOnGpu("to_json option pretty=true is not supported")
}
val hasDates = TrampolineUtil.dataTypeExistsRecursively(a.child.dataType,
_.isInstanceOf[DateType])
if (hasDates) {
// check if the default format is being used
val defaultFormat = "yyyy-MM-dd"
val dateFormat = a.options.getOrElse("dateFormat", defaultFormat)
if (dateFormat != defaultFormat) {
// we can likely support other formats but we would need to add tests
// tracking issue is https://github.com/NVIDIA/spark-rapids/issues/9602
willNotWorkOnGpu(s"Unsupported dateFormat '$dateFormat' in to_json")
}
}
val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(a.child.dataType,
_.isInstanceOf[TimestampType])
if (hasTimestamps) {
// check if the default format is being used
val defaultFormat = "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]"
val timestampFormat = a.options.getOrElse("timestampFormat", defaultFormat)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this does 100% of what we want

https://github.com/apache/spark/blob/7120e6b88f2327ffb71c4bca14b10b15aeb26c32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L125-L130

include a check for the legacyTimeParserPolicy. Either we need to use JSONOptions directly to get the format that we should use, or we need to replicate the behavior exactly. I would prefer JSONOptions if possible, even through a trampoline, because it lets us know exactly what it is, even if it changes from one version of spark to another.

if (timestampFormat != defaultFormat) {
// we can likely support other formats but we would need to add tests
// tracking issue is https://github.com/NVIDIA/spark-rapids/issues/9602
willNotWorkOnGpu(s"Unsupported timestampFormat '$timestampFormat' in to_json")
}
val timeZone = a.options.getOrElse("timeZone", SQLConf.get.sessionLocalTimeZone)
if (timeZone != "UTC" && timeZone != "Etc/UTC") {
// we hard-code the timezone `Z` in GpuCast.castTimestampToJson
// so we need to fall back if a different timeZone is specified
willNotWorkOnGpu(s"Unsupported timeZone '$timeZone' in to_json")
}
}
}

override def convertToGpu(child: Expression): GpuExpression =
Expand Down
2 changes: 1 addition & 1 deletion tools/generated_files/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ StringTrimLeft,S,`ltrim`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,
StringTrimRight,S,`rtrim`,None,project,src,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
StringTrimRight,S,`rtrim`,None,project,trimStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
StringTrimRight,S,`rtrim`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
StructsToJson,NS,`to_json`,This is disabled by default because to_json support is experimental. See compatibility guide for more information.,project,struct,S,S,S,S,S,S,S,NA,NA,S,NA,NA,NA,NA,S,S,S,NA
StructsToJson,NS,`to_json`,This is disabled by default because to_json support is experimental. See compatibility guide for more information.,project,struct,S,S,S,S,S,S,S,S,PS,S,NA,NA,NA,NA,PS,PS,PS,NA
StructsToJson,NS,`to_json`,This is disabled by default because to_json support is experimental. See compatibility guide for more information.,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,str,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,pos,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Expand Down