Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 29, 2023
2 parents fce5187 + 7efcb81 commit 24fb229
Show file tree
Hide file tree
Showing 19 changed files with 464 additions and 105 deletions.
1 change: 0 additions & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ with Spark, and can be enabled by setting `spark.rapids.sql.expression.StructsTo

Known issues are:

- There is no support for timestamp types
- There can be rounding differences when formatting floating-point numbers as strings. For example, Spark may
produce `-4.1243574E26` but the GPU may produce `-4.124357351E26`.
- Not all JSON options are respected
Expand Down
10 changes: 5 additions & 5 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -14541,16 +14541,16 @@ are limited.
<td>S</td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP</em></td>
<td> </td>
</tr>
<tr>
Expand Down
3 changes: 1 addition & 2 deletions integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ def test_cast_nested(data_gen, to_type):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type)))

@datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9781")
def test_cast_string_date_valid_format():
# In Spark 3.2.0+ the valid format changed, and we cannot support all of the format.
# This provides values that are valid in all of those formats.
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}')).select(f.col('a').cast(DateType())),
lambda spark : unary_op_df(spark, StringGen('[0-9]{0,3}[1-9]-[0-9]{1,2}-[0-9]{1,2}')).select(f.col('a').cast(DateType())),
conf = {'spark.rapids.sql.hasExtendedYearValues': 'false'})

invalid_values_string_to_date = ['200', ' 1970A', '1970 A', '1970T', # not conform to "yyyy" after trim
Expand Down
19 changes: 14 additions & 5 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,31 @@ def test_from_utc_timestamp(data_gen, time_zone):
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)))

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST"], ids=idfn)
@pytest.mark.parametrize('time_zone', ["Asia/Shanghai", "EST", "MST", "VST", "PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone):
def test_from_utc_timestamp_non_utc_fallback(data_gen, time_zone):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
'FromUTCTimestamp')

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
'FromUTCTimestamp',
conf = {"spark.rapids.sql.nonUTC.enabled": "true"})


@pytest.mark.parametrize('time_zone', ["UTC", "Asia/Shanghai", "EST", "MST", "VST"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.xfail(condition = is_not_utc(), reason = 'xfail non-UTC time zone tests because of https://github.com/NVIDIA/spark-rapids/issues/9653')
def test_from_utc_timestamp_supported_timezones(data_gen, time_zone):
# Remove spark.rapids.test.CPU.timezone configuration when GPU kernel is ready to really test on GPU
# TODO: Remove spark.rapids.sql.nonUTC.enabled configuration
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), conf = {"spark.rapids.test.CPU.timezone": "true"})

lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
conf = {"spark.rapids.sql.nonUTC.enabled": "true"})

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
Expand Down
152 changes: 148 additions & 4 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,8 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
decimal_gen_128bit,
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9350')),
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9350')),
pytest.param(date_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9515')),
pytest.param(timestamp_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9515')),
date_gen,
timestamp_gen,
StringGen('[A-Za-z0-9\r\n\'"\\\\]{0,10}', nullable=True) \
.with_special_case('\u1f600') \
.with_special_case('"a"') \
Expand All @@ -803,8 +803,13 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
pytest.param(True, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9517')),
False
])
@pytest.mark.parametrize('timezone', [
'UTC',
'Etc/UTC',
pytest.param('UTC+07:00', marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
@pytest.mark.xfail(condition = is_not_utc(), reason = 'xfail non-UTC time zone tests because of https://github.com/NVIDIA/spark-rapids/issues/9653')
def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty):
def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty, timezone):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
Expand All @@ -816,7 +821,8 @@ def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty):
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'ignoreNullFields': ignore_null_fields,
'pretty': pretty }
'pretty': pretty,
'timeZone': timezone}

def struct_to_json(spark):
df = gen_df(spark, gen)
Expand All @@ -828,3 +834,141 @@ def struct_to_json(spark):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : struct_to_json(spark),
conf=conf)

@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.parametrize('timestamp_format', [
'yyyy-MM-dd\'T\'HH:mm:ss[.SSS][XXX]',
pytest.param('yyyy-MM-dd\'T\'HH:mm:ss.SSSXXX', marks=pytest.mark.allow_non_gpu('ProjectExec')),
pytest.param('dd/MM/yyyy\'T\'HH:mm:ss[.SSS][XXX]', marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
@pytest.mark.parametrize('timezone', [
'UTC',
'Etc/UTC',
pytest.param('UTC+07:00', marks=pytest.mark.allow_non_gpu('ProjectExec')),
])
def test_structs_to_json_timestamp(spark_tmp_path, data_gen, timestamp_format, timezone):
struct_gen = StructGen([
("b", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timestampFormat': timestamp_format,
'timeZone': timezone}

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options))

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_and_cpu_are_equal_collect(
lambda spark : struct_to_json(spark),
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.parametrize('timezone', ['UTC+07:00'])
def test_structs_to_json_fallback_timezone(spark_tmp_path, data_gen, timezone):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
("c", ArrayGen(StructGen([('child', data_gen)], nullable=True))),
("d", MapGen(LongGen(nullable=False), data_gen)),
("d", MapGen(StringGen('[A-Za-z0-9]{0,10}', nullable=False), data_gen)),
("e", ArrayGen(MapGen(LongGen(nullable=False), data_gen), nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timeZone': timezone }

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options)).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [date_gen, timestamp_gen], ids=idfn)
def test_structs_to_json_fallback_legacy(spark_tmp_path, data_gen):
struct_gen = StructGen([
("a", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct")).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True,
'spark.sql.legacy.timeParserPolicy': 'LEGACY'})

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [date_gen], ids=idfn)
@pytest.mark.parametrize('timezone', ['UTC'])
@pytest.mark.parametrize('date_format', [
'yyyy-dd-MM',
'dd/MM/yyyy',
])
def test_structs_to_json_fallback_date_formats(spark_tmp_path, data_gen, timezone, date_format):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timeZone': timezone,
'dateFormat': date_format }

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options)).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.parametrize('timezone', ['UTC'])
@pytest.mark.parametrize('timestamp_format', [
'yyyy-MM-dd\'T\'HH:mm:ss.SSSXXX',
'dd/MM/yyyy\'T\'HH:mm:ss[.SSS][XXX]',
])
def test_structs_to_json_fallback_date_formats(spark_tmp_path, data_gen, timezone, timestamp_format):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)

options = { 'timeZone': timezone,
'timestampFormat': timestamp_format }

def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options)).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{ 'spark.rapids.sql.expression.StructsToJson': True })

assert_gpu_fallback_collect(
lambda spark : struct_to_json(spark),
'ProjectExec',
conf=conf)
21 changes: 21 additions & 0 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,29 @@ run_pyarrow_tests() {
./run_pyspark_from_build.sh -m pyarrow_test --pyarrow_test
}

run_non_utc_time_zone_tests() {
# select one time zone according to current day of week
non_utc_time_zones=("Asia/Shanghai" "Iran")
time_zones_length=${#non_utc_time_zones[@]}
# get day of week, Sunday is represented by 0 and Saturday by 6
current_date=$(date +%w)
echo "Current day of week is: ${current_date}"
time_zone_index=$((current_date % time_zones_length))
time_zone="${non_utc_time_zones[${time_zone_index}]}"
echo "Run Non-UTC tests, time zone is ${time_zone}"

# run tests
TZ=${time_zone} ./run_pyspark_from_build.sh
}

# TEST_MODE
# - DEFAULT: all tests except cudf_udf tests
# - DELTA_LAKE_ONLY: Delta Lake tests only
# - ICEBERG_ONLY: iceberg tests only
# - AVRO_ONLY: avro tests only (with --packages option instead of --jars)
# - CUDF_UDF_ONLY: cudf_udf tests only, requires extra conda cudf-py lib
# - MULTITHREADED_SHUFFLE: shuffle tests only
# - NON_UTC_TZ: test all tests in a non-UTC time zone which is selected according to current day of week.
TEST_MODE=${TEST_MODE:-'DEFAULT'}
if [[ $TEST_MODE == "DEFAULT" ]]; then
./run_pyspark_from_build.sh
Expand Down Expand Up @@ -321,6 +337,11 @@ if [[ "$TEST_MODE" == "DEFAULT" || "$TEST_MODE" == "PYARROW_ONLY" ]]; then
run_pyarrow_tests
fi

# Non-UTC time zone tests
if [[ "$TEST_MODE" == "NON_UTC_TZ" ]]; then
run_non_utc_time_zone_tests
fi

popd
stop-worker.sh
stop-master.sh
17 changes: 14 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ object GpuCast {
fromDataType: DataType, options: CastOptions): ColumnVector = fromDataType match {
case StringType => input.copyToColumnVector()
case DateType => input.asStrings("%Y-%m-%d")
case TimestampType if options.castToJsonString => castTimestampToJson(input)
case TimestampType => castTimestampToString(input)
case FloatType | DoubleType => castFloatingTypeToString(input)
case BinaryType => castBinToString(input, options)
Expand Down Expand Up @@ -773,6 +774,14 @@ object GpuCast {
}
}

private def castTimestampToJson(input: ColumnView): ColumnVector = {
// we fall back to CPU if the JSON timezone is not UTC, so it is safe
// to hard-code `Z` here for now, but we should really add a timestamp
// format to CastOptions when we add support for custom formats in
// https://github.com/NVIDIA/spark-rapids/issues/9602
input.asStrings("%Y-%m-%dT%H:%M:%S.%3fZ")
}

/**
* A 5 steps solution for concatenating string array column. <p>
* Giving an input with 3 rows:
Expand Down Expand Up @@ -932,7 +941,8 @@ object GpuCast {
// to be represented by the string literal `null`
val strValue = closeOnExcept(strKey) { _ =>
withResource(kvStructColumn.getChildColumnView(1)) { valueColumn =>
val valueStr = if (valueColumn.getType == DType.STRING) {
val dt = valueColumn.getType
val valueStr = if (dt == DType.STRING || dt.isDurationType || dt.isTimestampType) {
withResource(castToString(valueColumn, from.valueType, options)) { valueStr =>
addQuotes(valueStr, valueColumn.getRowCount.toInt)
}
Expand Down Expand Up @@ -1102,8 +1112,9 @@ object GpuCast {
colon: ColumnVector,
quote: ColumnVector): ColumnVector = {
val jsonName = StringEscapeUtils.escapeJson(inputSchema(fieldIndex).name)
val dataType = inputSchema(fieldIndex).dataType
val needsQuoting = dataType == DataTypes.StringType
val dt = inputSchema(fieldIndex).dataType
val needsQuoting = dt == DataTypes.StringType || dt == DataTypes.DateType ||
dt == DataTypes.TimestampType
withResource(input.getChildColumnView(fieldIndex)) { cv =>
withResource(ArrayBuffer.empty[ColumnVector]) { attrColumns =>
// prefix with quoted column name followed by colon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ object GpuOverrides extends Logging {
}
}

def isUTCTimezone(timezoneId: ZoneId): Boolean = {
timezoneId.normalized() == UTC_TIMEZONE_ID
}

def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType(_))

/**
Expand Down Expand Up @@ -3600,22 +3604,16 @@ object GpuOverrides extends Logging {
TypeSig.STRING,
Seq(ParamCheck("struct",
(TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT +
TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP +
TypeSig.DECIMAL_128 +
TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
(TypeSig.BOOLEAN + TypeSig.STRING + TypeSig.integral + TypeSig.FLOAT +
TypeSig.DOUBLE + TypeSig.DATE + TypeSig.TIMESTAMP +
TypeSig.DECIMAL_128 +
TypeSig.DOUBLE + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested()
TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested()
))),
(a, conf, p, r) => new UnaryExprMeta[StructsToJson](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (a.options.get("pretty").exists(_.equalsIgnoreCase("true"))) {
willNotWorkOnGpu("to_json option pretty=true is not supported")
}
}

override def convertToGpu(child: Expression): GpuExpression =
GpuStructsToJson(a.options, child, a.timeZoneId)
}).disabledByDefault("to_json support is experimental. See compatibility " +
(a, conf, p, r) => new GpuStructsToJsonMeta(a, conf, p, r))
.disabledByDefault("to_json support is experimental. See compatibility " +
"guide for more information."),
expr[JsonTuple](
"Returns a tuple like the function get_json_object, but it takes multiple names. " +
Expand Down
Loading

0 comments on commit 24fb229

Please sign in to comment.