Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add date and timestamp support to to_json [databricks] #9600

Merged
merged 26 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ with Spark, and can be enabled by setting `spark.rapids.sql.expression.StructsTo

Known issues are:

- There is no support for timestamp types
- There can be rounding differences when formatting floating-point numbers as strings. For example, Spark may
produce `-4.1243574E26` but the GPU may produce `-4.124357351E26`.
- Not all JSON options are respected
Expand Down
10 changes: 5 additions & 5 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -14541,16 +14541,16 @@ are limited.
<td>S</td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td> </td>
</tr>
<tr>
Expand Down
152 changes: 148 additions & 4 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,8 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
long_gen,
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9350')),
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9350')),
pytest.param(date_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9515')),
pytest.param(timestamp_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9515')),
date_gen,
timestamp_gen,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some fallback tests for the options timestampFormat, dateFormat, and if "spark.sql.legacy.timeParserPolicy" is set to LEGACY? At least from the code all of these impact the format that is used to write the JSON output.

StringGen('[A-Za-z0-9\r\n\'"\\\\]{0,10}', nullable=True) \
.with_special_case('\u1f600') \
.with_special_case('"a"') \
Expand All @@ -800,8 +800,13 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
pytest.param(True, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9517')),
False
])
@pytest.mark.parametrize('timezone', [
'UTC',
'Etc/UTC',
pytest.param('UTC+07:00', marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
@pytest.mark.xfail(condition = is_not_utc(), reason = 'xfail non-UTC time zone tests because of https://github.com/NVIDIA/spark-rapids/issues/9653')
def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty):
def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty, timezone):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a fallback test for non-utc?

struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
Expand All @@ -813,7 +818,8 @@ def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty):
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'ignoreNullFields': ignore_null_fields,
'pretty': pretty }
'pretty': pretty,
'timeZone': timezone}

def struct_to_json(spark):
df = gen_df(spark, gen)
Expand All @@ -825,3 +831,141 @@ def struct_to_json(spark):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : struct_to_json(spark),
conf=conf)

@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.parametrize('timestamp_format', [
'yyyy-MM-dd\'T\'HH:mm:ss[.SSS][XXX]',
pytest.param('yyyy-MM-dd\'T\'HH:mm:ss.SSSXXX', marks=pytest.mark.allow_non_gpu('ProjectExec')),
pytest.param('dd/MM/yyyy\'T\'HH:mm:ss[.SSS][XXX]', marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
@pytest.mark.parametrize('timezone', [
'UTC',
'Etc/UTC',
pytest.param('UTC+07:00', marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
def test_structs_to_json_timestamp(spark_tmp_path, data_gen, timestamp_format, timezone):
struct_gen = StructGen([
("b", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timestampFormat': timestamp_format,
'timeZone': timezone}

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options))

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_and_cpu_are_equal_collect(
lambda spark : struct_to_json(spark),
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.parametrize('timezone', ['UTC+07:00'])
def test_structs_to_json_fallback_timezone(spark_tmp_path, data_gen, timezone):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
("c", ArrayGen(StructGen([('child', data_gen)], nullable=True))),
("d", MapGen(LongGen(nullable=False), data_gen)),
("d", MapGen(StringGen('[A-Za-z0-9]{0,10}', nullable=False), data_gen)),
("e", ArrayGen(MapGen(LongGen(nullable=False), data_gen), nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timeZone': timezone }

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options)).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [date_gen, timestamp_gen], ids=idfn)
def test_structs_to_json_fallback_legacy(spark_tmp_path, data_gen):
struct_gen = StructGen([
("a", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct")).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True,
'spark.sql.legacy.timeParserPolicy': 'LEGACY'})

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [date_gen], ids=idfn)
@pytest.mark.parametrize('timezone', ['UTC'])
@pytest.mark.parametrize('date_format', [
'yyyy-dd-MM',
'dd/MM/yyyy',
])
def test_structs_to_json_fallback_date_formats(spark_tmp_path, data_gen, timezone, date_format):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timeZone': timezone,
'dateFormat': date_format }

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options)).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.parametrize('timezone', ['UTC'])
@pytest.mark.parametrize('timestamp_format', [
'yyyy-MM-dd\'T\'HH:mm:ss.SSSXXX',
'dd/MM/yyyy\'T\'HH:mm:ss[.SSS][XXX]',
])
def test_structs_to_json_fallback_date_formats(spark_tmp_path, data_gen, timezone, timestamp_format):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timeZone': timezone,
'timestampFormat': timestamp_format }

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options)).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)
17 changes: 14 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ object GpuCast {
fromDataType: DataType, options: CastOptions): ColumnVector = fromDataType match {
case StringType => input.copyToColumnVector()
case DateType => input.asStrings("%Y-%m-%d")
case TimestampType if options.castToJsonString => castTimestampToJson(input)
case TimestampType => castTimestampToString(input)
case FloatType | DoubleType => castFloatingTypeToString(input)
case BinaryType => castBinToString(input, options)
Expand Down Expand Up @@ -773,6 +774,14 @@ object GpuCast {
}
}

private def castTimestampToJson(input: ColumnView): ColumnVector = {
// we fall back to CPU if the JSON timezone is not UTC, so it is safe
// to hard-code `Z` here for now, but we should really add a timestamp
// format to CastOptions when we add support for custom formats in
// https://github.com/NVIDIA/spark-rapids/issues/9602
input.asStrings("%Y-%m-%dT%H:%M:%S.%3fZ")
}

/**
* A 5 steps solution for concatenating string array column. <p>
* Giving an input with 3 rows:
Expand Down Expand Up @@ -932,7 +941,8 @@ object GpuCast {
// to be represented by the string literal `null`
val strValue = closeOnExcept(strKey) { _ =>
withResource(kvStructColumn.getChildColumnView(1)) { valueColumn =>
val valueStr = if (valueColumn.getType == DType.STRING) {
val dt = valueColumn.getType
val valueStr = if (dt == DType.STRING || dt.isDurationType || dt.isTimestampType) {
withResource(castToString(valueColumn, from.valueType, options)) { valueStr =>
addQuotes(valueStr, valueColumn.getRowCount.toInt)
}
Expand Down Expand Up @@ -1102,8 +1112,9 @@ object GpuCast {
colon: ColumnVector,
quote: ColumnVector): ColumnVector = {
val jsonName = StringEscapeUtils.escapeJson(inputSchema(fieldIndex).name)
val dataType = inputSchema(fieldIndex).dataType
val needsQuoting = dataType == DataTypes.StringType
val dt = inputSchema(fieldIndex).dataType
val needsQuoting = dt == DataTypes.StringType || dt == DataTypes.DateType ||
dt == DataTypes.TimestampType
withResource(input.getChildColumnView(fieldIndex)) { cv =>
withResource(ArrayBuffer.empty[ColumnVector]) { attrColumns =>
// prefix with quoted column name followed by colon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3600,20 +3600,14 @@ object GpuOverrides extends Logging {
TypeSig.STRING,
Seq(ParamCheck("struct",
(TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT +
TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP +
TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
(TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT +
TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested()
TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP +
TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested()
))),
(a, conf, p, r) => new UnaryExprMeta[StructsToJson](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (a.options.get("pretty").exists(_.equalsIgnoreCase("true"))) {
willNotWorkOnGpu("to_json option pretty=true is not supported")
}
}

override def convertToGpu(child: Expression): GpuExpression =
GpuStructsToJson(a.options, child, a.timeZoneId)
}).disabledByDefault("to_json support is experimental. See compatibility " +
(a, conf, p, r) => new GpuStructsToJsonMeta(a, conf, p, r))
.disabledByDefault("to_json support is experimental. See compatibility " +
"guide for more information."),
expr[JsonTuple](
"Returns a tuple like the function get_json_object, but it takes multiple names. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,64 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf.ColumnVector
import com.nvidia.spark.rapids.{CastOptions, GpuCast, GpuColumnVector, GpuUnaryExpression}
import com.nvidia.spark.rapids.{CastOptions, DataFromReplacementRule, GpuCast, GpuColumnVector, GpuExpression, GpuUnaryExpression, RapidsConf, RapidsMeta, UnaryExprMeta}
import com.nvidia.spark.rapids.GpuOverrides
import com.nvidia.spark.rapids.shims.LegacyBehaviorPolicyShim

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Expression, StructsToJson}
import org.apache.spark.sql.catalyst.json.GpuJsonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StringType, StructType}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, DateType, StringType, StructType, TimestampType}

class GpuStructsToJsonMeta(
expr: StructsToJson,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule
) extends UnaryExprMeta[StructsToJson](expr, conf, parent, rule) {

override def tagExprForGpu(): Unit = {
if (expr.options.get("pretty").exists(_.equalsIgnoreCase("true"))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI. #9719 touches a little bit of timezone check. given StructsToJson is also a TimeZoneAwareExpression.

willNotWorkOnGpu("to_json option pretty=true is not supported")
}
val options = GpuJsonUtils.parseJSONOptions(expr.options)
val hasDates = TrampolineUtil.dataTypeExistsRecursively(expr.child.dataType,
_.isInstanceOf[DateType])
if (hasDates) {
GpuJsonUtils.dateFormatInWrite(options) match {
case "yyyy-MM-dd" =>
case dateFormat =>
// we can likely support other formats but we would need to add tests
// tracking issue is https://github.com/NVIDIA/spark-rapids/issues/9602
willNotWorkOnGpu(s"Unsupported dateFormat '$dateFormat' in to_json")
}
}
val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(expr.child.dataType,
_.isInstanceOf[TimestampType])
if (hasTimestamps) {
GpuJsonUtils.timestampFormatInWrite(options) match {
case "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]" =>
case timestampFormat =>
// we can likely support other formats but we would need to add tests
// tracking issue is https://github.com/NVIDIA/spark-rapids/issues/9602
willNotWorkOnGpu(s"Unsupported timestampFormat '$timestampFormat' in to_json")
}
if (options.zoneId.normalized() != GpuOverrides.UTC_TIMEZONE_ID) {
// we hard-code the timezone `Z` in GpuCast.castTimestampToJson
// so we need to fall back if expr different timeZone is specified
willNotWorkOnGpu(s"Unsupported timeZone '${options.zoneId}' in to_json")
}
}

if (LegacyBehaviorPolicyShim.isLegacyTimeParserPolicy) {
willNotWorkOnGpu("LEGACY timeParserPolicy is not supported in GpuJsonToStructs")
}
}

override def convertToGpu(child: Expression): GpuExpression =
GpuStructsToJson(expr.options, child, expr.timeZoneId)
}

case class GpuStructsToJson(
options: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,22 @@ object GpuJsonUtils {
optionalTimestampFormatInRead(parseJSONReadOptions(options))

def timestampFormatInRead(options: JSONOptions): String = options.timestampFormat

def dateFormatInWrite(options: JSONOptions): String =
options.dateFormat

def timestampFormatInWrite(options: JSONOptions): String =
options.timestampFormat

def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONOptions(options: Map[String, String]) = {
new JSONOptions(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}

def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,22 @@ object GpuJsonUtils {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})

def dateFormatInWrite(options: JSONOptions): String =
options.dateFormatInWrite

def timestampFormatInWrite(options: JSONOptions): String =
options.timestampFormatInWrite

def enableDateTimeParsingFallback(options: JSONOptions): Boolean = false

def parseJSONOptions(options: Map[String, String]) = {
new JSONOptions(
options,
SQLConf.get.sessionLocalTimeZone,
SQLConf.get.columnNameOfCorruptRecord)
}


def parseJSONReadOptions(options: Map[String, String]) = {
new JSONOptionsInRead(
options,
Expand Down
Loading