Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move timezone check to each operator [databricks] #9482

Closed
Closed
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d8e77b2
Add test cases for timezone awarded operators
Oct 19, 2023
3f781a4
Move timezone check to each operator
Oct 19, 2023
d5a6d7a
Merge branch 23.12
Oct 27, 2023
b3fa3ee
Update
Oct 27, 2023
c31b2e3
debug
Oct 27, 2023
a7c8996
debug
Oct 27, 2023
2878c5c
Add timezone test mark
Oct 27, 2023
705f8b5
Minor update
Nov 1, 2023
882b751
Fix failed cmp case on Spark311; Restore a python import; minor changes
Nov 1, 2023
aec893c
Fix failure on Databricks
Nov 2, 2023
7f81644
Update test cases for Databricks
Nov 2, 2023
bcc1f5b
Update test cases for Databricks
Nov 2, 2023
505b72e
Fix delta lake test cases.
Nov 3, 2023
07942ea
Fix delta lake test cases.
Nov 3, 2023
3033bc3
Remove the skip logic when time zone is not UTC
Nov 7, 2023
a852455
Add time zone config to set non-UTC
Nov 7, 2023
0358cd4
Add fallback case for cast_test.py
Nov 7, 2023
f6ccadd
Add fallback case for cast_test.py
Nov 7, 2023
21d5a69
Add fallback case for cast_test.py
Nov 8, 2023
e2aa9da
Add fallback case for cast_test.py
Nov 8, 2023
9eab476
Update split_list
Nov 8, 2023
e231a80
Add fallback case for cast_test.py
Nov 8, 2023
71928a0
Add fallback case for cast_test.py
Nov 8, 2023
ca23932
Add fallback cases for cmp_test.py
Nov 9, 2023
ee60bea
Add fallback tests for json_test.py
firestarman Nov 9, 2023
d403c59
add non_utc fallback for parquet_write qa_select and window_function …
thirtiseven Nov 9, 2023
dd5ad0b
Add fallback tests for conditionals_test.py
winningsix Nov 9, 2023
058e13e
Add fallback cases for collection_ops_test.py
Nov 9, 2023
fc3a678
add fallback tests for date_time_test
thirtiseven Nov 9, 2023
938c649
clean up spark_session.py
thirtiseven Nov 9, 2023
befa39d
Add fallback tests for explain_test and csv_test
winningsix Nov 9, 2023
cf2c621
Update test case
Nov 9, 2023
c298d5f
update test case
Nov 9, 2023
09e772c
Add default value
Nov 10, 2023
f43a8f9
Remove useless is_tz_utc
Nov 10, 2023
5882cc3
Fix fallback cases
Nov 10, 2023
7a53dc2
Add bottom check for time zone; Fix ORC check
Nov 13, 2023
7bd9ef8
By default, ExecCheck do not check UTC time zone
Nov 13, 2023
9817c4e
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
f8505b7
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
fa1c84d
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
fbbbd5b
Update test cases
Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 125 additions & 21 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
from data_gen import *
from datetime import timezone
from conftest import is_databricks_runtime
from marks import approximate_float, allow_non_gpu, ignore_order, disable_timezone_test
from conftest import is_databricks_runtime, is_not_utc, is_utc
from marks import approximate_float, allow_non_gpu, ignore_order
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330, is_before_spark_340, \
is_before_spark_341

Expand Down Expand Up @@ -182,10 +182,11 @@ def test_json_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_li
"'T'HH:mm[:ss]",
"'T'HH:mm"]


@pytest.mark.xfail(is_not_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@pytest.mark.parametrize('ts_part', json_supported_ts_parts)
@pytest.mark.parametrize('date_format', json_supported_date_formats)
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
@disable_timezone_test
def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_enabled_list):
full_format = date_format + ts_part
data_gen = TimestampGen()
Expand All @@ -204,25 +205,66 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
.json(data_path),
conf=updated_conf)

@allow_non_gpu('FileSourceScanExec', 'ProjectExec')
@allow_non_gpu('FileSourceScanExec', 'BatchScanExec')
@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be xfail?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are testing fallback logic for non-UTC TZ. And in future will remove this.
For utc TZ, just skip is OK.

@pytest.mark.parametrize('ts_part', json_supported_ts_parts)
@pytest.mark.parametrize('date_format', json_supported_date_formats)
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_json_ts_formats_round_trip_for_non_utc(spark_tmp_path, date_format, ts_part,
v1_enabled_list):
full_format = date_format + ts_part
data_gen = TimestampGen()
gen = StructGen([('a', data_gen)], nullable=False)
data_path = spark_tmp_path + '/JSON_DATA'
schema = gen.data_type
with_cpu_session(
lambda spark: gen_df(spark, gen).write \
.option('timestampFormat', full_format).json(data_path))
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list})
cpu_fallback_class_name = "FileSourceScanExec" if v1_enabled_list == "json" else "BatchScanExec"
assert_gpu_fallback_collect(
lambda spark: spark.read.schema(schema)\
.option('timestampFormat', full_format).json(data_path),
cpu_fallback_class_name,
conf=updated_conf)


@allow_non_gpu('FileSourceScanExec', 'ProjectExec', 'BatchScanExec')
@pytest.mark.skipif(is_before_spark_341(), reason='`TIMESTAMP_NTZ` is only supported in PySpark 341+')
@pytest.mark.parametrize('ts_part', json_supported_ts_parts)
@pytest.mark.parametrize('date_format', json_supported_date_formats)
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_json_ts_formats_round_trip_ntz_fallback(spark_tmp_path, date_format, ts_part,
v1_enabled_list):
json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, 'TIMESTAMP_NTZ',
v1_enabled_list, False)


@pytest.mark.xfail(is_not_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@pytest.mark.skipif(is_before_spark_341(), reason='`TIMESTAMP_NTZ` is only supported in PySpark 341+')
@pytest.mark.parametrize('ts_part', json_supported_ts_parts)
@pytest.mark.parametrize('date_format', json_supported_date_formats)
@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"])
@disable_timezone_test
def test_json_ts_formats_round_trip_ntz_v1(spark_tmp_path, date_format, ts_part, timestamp_type):
json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, 'json', 'FileSourceScanExec')
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_json_ts_formats_round_trip_ltz(spark_tmp_path, date_format, ts_part, v1_enabled_list):
json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, 'TIMESTAMP_LTZ',
v1_enabled_list, False)

@allow_non_gpu('BatchScanExec', 'ProjectExec')

@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@allow_non_gpu('BatchScanExec', 'ProjectExec', 'FileSourceScanExec')
@pytest.mark.skipif(is_before_spark_341(), reason='`TIMESTAMP_NTZ` is only supported in PySpark 341+')
@pytest.mark.parametrize('ts_part', json_supported_ts_parts)
@pytest.mark.parametrize('date_format', json_supported_date_formats)
@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"])
@disable_timezone_test
def test_json_ts_formats_round_trip_ntz_v2(spark_tmp_path, date_format, ts_part, timestamp_type):
json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, '', 'BatchScanExec')
@pytest.mark.parametrize('v1_list', ["", "json"])
def test_json_ts_formats_round_trip_ltz_for_non_utc(spark_tmp_path, date_format, ts_part, v1_list):
json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, 'TIMESTAMP_LTZ',
v1_list, True)


def json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, v1_enabled_list, cpu_scan_class):
def json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type,
v1_enabled_list, force_fallback_test):
cpu_scan_class = "FileSourceScanExec" if v1_enabled_list == "json" else "BatchScanExec"
full_format = date_format + ts_part
data_gen = TimestampGen(tzinfo=None if timestamp_type == "TIMESTAMP_NTZ" else timezone.utc)
gen = StructGen([('a', data_gen)], nullable=False)
Expand All @@ -245,14 +287,15 @@ def do_read(spark):
.json(data_path)


if timestamp_type == "TIMESTAMP_LTZ":
if timestamp_type == "TIMESTAMP_LTZ" and not force_fallback_test:
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark : do_read(spark),
exist_classes = 'Gpu' + cpu_scan_class,
non_exist_classes = cpu_scan_class,
conf=updated_conf)
else:
# we fall back to CPU due to "unsupported data types in output: TimestampNTZType"
# or unsupported timezone
assert_gpu_fallback_collect(
lambda spark : do_read(spark),
cpu_fallback_class_name = cpu_scan_class,
Expand Down Expand Up @@ -386,6 +429,8 @@ def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, an
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)


@pytest.mark.xfail(is_not_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@approximate_float
@pytest.mark.parametrize('filename', [
'timestamps.json',
Expand All @@ -398,7 +443,6 @@ def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, an
'CORRECTED',
'EXCEPTION'
])
@disable_timezone_test
def test_json_read_valid_timestamps(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy, \
spark_tmp_table_factory):
updated_conf = copy_and_update(_enable_all_types_conf,
Expand All @@ -407,6 +451,25 @@ def test_json_read_valid_timestamps(std_input_path, filename, schema, read_func,
f = read_func(std_input_path + '/' + filename, schema, spark_tmp_table_factory, {})
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)


@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@approximate_float
@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', ['LEGACY', 'CORRECTED', 'EXCEPTION'])
def test_json_read_valid_timestamps_for_non_utc(std_input_path, read_func, ansi_enabled,
time_parser_policy, spark_tmp_table_factory):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
test_f = read_func(std_input_path + '/timestamps.json', _timestamp_schema,
spark_tmp_table_factory)
assert_gpu_fallback_collect(test_f,
cpu_fallback_class_name='FileSourceScanExec',
conf=updated_conf)


@pytest.mark.parametrize('schema', [_string_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_unquoted_chars', ["true"])
Expand Down Expand Up @@ -456,7 +519,8 @@ def test_json_read_count(spark_tmp_path, v1_enabled_list):
lambda spark : spark.read.schema(schema).json(data_path),
conf=updated_conf)

@disable_timezone_test

@pytest.mark.xfail(is_not_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
def test_from_json_map():
# The test here is working around some inconsistencies in how the keys are parsed for maps
# on the GPU the keys are dense, but on the CPU they are sparse
Expand All @@ -466,6 +530,19 @@ def test_from_json_map():
.select(f.from_json(f.col('a'), 'MAP<STRING,STRING>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})


@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@allow_non_gpu('ProjectExec')
def test_from_json_map_for_non_utc():
# The test here is working around some inconsistencies in how the keys are parsed for maps
# on the GPU the keys are dense, but on the CPU they are sparse
json_string_gen = StringGen(r'{"a": "[0-9]{0,5}"(, "b": "[A-Z]{0,5}")?}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, json_string_gen)
.select(f.from_json(f.col('a'), 'MAP<STRING,STRING>')),
conf={"spark.rapids.sql.expression.JsonToStructs": True})


@allow_non_gpu('ProjectExec', 'JsonToStructs')
def test_from_json_map_fallback():
# The test here is working around some inconsistencies in how the keys are parsed for maps
Expand Down Expand Up @@ -552,15 +629,15 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
all_confs = {'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.rapids.sql.format.json.read.enabled': True,
'spark.rapids.sql.format.json.enabled': True}
gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)),
gen_list =[('k0', LongGen(nullable=False, min_val=0, max_val=0)),
('k1', LongGen(nullable=False, min_val=1, max_val=1)),
('k2', LongGen(nullable=False, min_val=2, max_val=2)),
('k3', LongGen(nullable=False, min_val=3, max_val=3)),
('v0', LongGen()),
('v1', LongGen()),
('v2', LongGen()),
('v3', LongGen())]

gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/JSON_DATA'
with_cpu_session(
Expand All @@ -571,6 +648,7 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
conf=all_confs)


@pytest.mark.xfail(is_not_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@pytest.mark.parametrize('data_gen', [byte_gen,
boolean_gen,
short_gen,
Expand All @@ -591,7 +669,6 @@ def test_read_case_col_name(spark_tmp_path, v1_enabled_list, col_name):
pytest.param(True, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/9517')),
False
])
@disable_timezone_test
def test_structs_to_json(spark_tmp_path, data_gen, ignore_null_fields, pretty):
struct_gen = StructGen([
('a', data_gen),
Expand All @@ -615,4 +692,31 @@ def struct_to_json(spark):

assert_gpu_and_cpu_are_equal_collect(
lambda spark : struct_to_json(spark),
conf=conf)
conf=conf)


@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
@pytest.mark.parametrize('data_gen', [byte_gen, boolean_gen, short_gen, int_gen, long_gen,
float_gen, double_gen, date_gen, timestamp_gen,
string_gen], ids=idfn)
@pytest.mark.parametrize('ignore_null_fields', ['true', 'false'])
@pytest.mark.parametrize('pretty', ['true', 'false'])
@allow_non_gpu('ProjectExec')
def test_structs_to_json_for_non_utc(data_gen, ignore_null_fields, pretty):
struct_gen = StructGen([
('a', data_gen),
("b", StructGen([('child', data_gen)], nullable=True)),
("c", ArrayGen(StructGen([('child', data_gen)], nullable=True))),
("d", MapGen(LongGen(nullable=False), data_gen)),
("d", MapGen(StringGen('[A-Za-z0-9]{0,10}', nullable=False), data_gen)),
("e", ArrayGen(MapGen(LongGen(nullable=False), data_gen), nullable=True)),
], nullable=False)
gen = StructGen([('my_struct', struct_gen)], nullable=False)
options = {'ignoreNullFields': ignore_null_fields, 'pretty': pretty}
def struct_to_json(spark):
df = gen_df(spark, gen)
return df.withColumn("my_json", f.to_json("my_struct", options)).drop("my_struct")

conf = copy_and_update(_enable_all_types_conf,
{'spark.rapids.sql.expression.StructsToJson': True})
assert_gpu_and_cpu_are_equal_collect(lambda spark: struct_to_json(spark), conf=conf)