Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move timezone check to each operator [databricks] #9482

Closed
Closed
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d8e77b2
Add test cases for timezone awarded operators
Oct 19, 2023
3f781a4
Move timezone check to each operator
Oct 19, 2023
d5a6d7a
Merge branch 23.12
Oct 27, 2023
b3fa3ee
Update
Oct 27, 2023
c31b2e3
debug
Oct 27, 2023
a7c8996
debug
Oct 27, 2023
2878c5c
Add timezone test mark
Oct 27, 2023
705f8b5
Minor update
Nov 1, 2023
882b751
Fix failed cmp case on Spark311; Restore a python import; minor changes
Nov 1, 2023
aec893c
Fix failure on Databricks
Nov 2, 2023
7f81644
Update test cases for Databricks
Nov 2, 2023
bcc1f5b
Update test cases for Databricks
Nov 2, 2023
505b72e
Fix delta lake test cases.
Nov 3, 2023
07942ea
Fix delta lake test cases.
Nov 3, 2023
3033bc3
Remove the skip logic when time zone is not UTC
Nov 7, 2023
a852455
Add time zone config to set non-UTC
Nov 7, 2023
0358cd4
Add fallback case for cast_test.py
Nov 7, 2023
f6ccadd
Add fallback case for cast_test.py
Nov 7, 2023
21d5a69
Add fallback case for cast_test.py
Nov 8, 2023
e2aa9da
Add fallback case for cast_test.py
Nov 8, 2023
9eab476
Update split_list
Nov 8, 2023
e231a80
Add fallback case for cast_test.py
Nov 8, 2023
71928a0
Add fallback case for cast_test.py
Nov 8, 2023
ca23932
Add fallback cases for cmp_test.py
Nov 9, 2023
ee60bea
Add fallback tests for json_test.py
firestarman Nov 9, 2023
d403c59
add non_utc fallback for parquet_write qa_select and window_function …
thirtiseven Nov 9, 2023
dd5ad0b
Add fallback tests for conditionals_test.py
winningsix Nov 9, 2023
058e13e
Add fallback cases for collection_ops_test.py
Nov 9, 2023
fc3a678
add fallback tests for date_time_test
thirtiseven Nov 9, 2023
938c649
clean up spark_session.py
thirtiseven Nov 9, 2023
befa39d
Add fallback tests for explain_test and csv_test
winningsix Nov 9, 2023
cf2c621
Update test case
Nov 9, 2023
c298d5f
update test case
Nov 9, 2023
09e772c
Add default value
Nov 10, 2023
f43a8f9
Remove useless is_tz_utc
Nov 10, 2023
5882cc3
Fix fallback cases
Nov 10, 2023
7a53dc2
Add bottom check for time zone; Fix ORC check
Nov 13, 2023
7bd9ef8
By default, ExecCheck do not check UTC time zone
Nov 13, 2023
9817c4e
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
f8505b7
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
fa1c84d
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
fbbbd5b
Update test cases
Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,14 @@ else
export PYSP_TEST_spark_jars="${ALL_JARS//:/,}"
fi

# time zone will be tested
TEST_TZ=${TEST_TZ:-UTC}

# Set the Delta log cache size to prevent the driver from caching every Delta log indefinitely
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=UTC'
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=$TEST_TZ -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions="-ea -Duser.timezone=$TEST_TZ"
export PYSP_TEST_spark_ui_showConsoleProgress='false'
export PYSP_TEST_spark_sql_session_timeZone='UTC'
export PYSP_TEST_spark_sql_session_timeZone=$TEST_TZ
export PYSP_TEST_spark_sql_shuffle_partitions='4'
# prevent cluster shape to change
export PYSP_TEST_spark_dynamicAllocation_enabled='false'
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/aqe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pyspark.sql.types import *
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
from data_gen import *
from marks import ignore_order, allow_non_gpu
from marks import ignore_order, allow_non_gpu, disable_timezone_test
from spark_session import with_cpu_session, is_databricks113_or_later

_adaptive_conf = { "spark.sql.adaptive.enabled": "true" }
Expand Down Expand Up @@ -195,6 +195,7 @@ def do_it(spark):
@ignore_order(local=True)
@allow_non_gpu('BroadcastNestedLoopJoinExec', 'Cast', 'DateSub', *db_113_cpu_bnlj_join_allow)
@pytest.mark.parametrize('join', joins, ids=idfn)
@disable_timezone_test
def test_aqe_join_reused_exchange_inequality_condition(spark_tmp_path, join):
data_path = spark_tmp_path + '/PARQUET_DATA'
def prep(spark):
Expand Down
58 changes: 58 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ def bring_back(spark):
return (df.collect(), df)
collect_type = 'COLLECT'
return (bring_back, collect_type)
elif mode == "COLLECT_ERROR_WITH_DATAFRAME":
def bring_back(spark):
"""
return collect error and df
if there is no error, collect error is empty string
"""
df = limit_func(spark)
collect_error = ""
try:
df.collect()
except Exception as e:
collect_error = str(e)
return (collect_error, df)
collect_type = 'COLLECT'
return (bring_back, collect_type)
else:
bring_back = lambda spark: limit_func(spark).toLocalIterator()
collect_type = 'ITERATOR'
Expand Down Expand Up @@ -444,6 +459,30 @@ def assert_gpu_fallback_collect(func,

assert_equal(from_cpu, from_gpu)

def assert_gpu_fallback_and_collect_with_error(func,
cpu_fallback_class_name,
error_message,
conf={}):
(bring_back, collect_type) = _prep_func_for_compare(func, 'COLLECT_ERROR_WITH_DATAFRAME')
conf = _prep_incompat_conf(conf)

print('### CPU RUN ###')
cpu_start = time.time()
collect_error, cpu_df = with_cpu_session(bring_back, conf=conf)
assert error_message in collect_error, f"Expected error '{error_message}' did not appear in '{collect_error}'"
cpu_end = time.time()

print('### GPU RUN ###')
gpu_start = time.time()
collect_error, gpu_df = with_gpu_session(bring_back, conf=conf)
assert error_message in collect_error, f"Expected error '{error_message}' did not appear in '{collect_error}'"
gpu_end = time.time()
jvm = spark_jvm()
jvm.org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertDidFallBack(gpu_df._jdf, cpu_fallback_class_name)
print('### {}: GPU TOOK {} CPU TOOK {} ###'.format(collect_type,
gpu_end - gpu_start, cpu_end - cpu_start))


def assert_gpu_sql_fallback_collect(df_fun, cpu_fallback_class_name, table_name, sql, conf=None, debug=False):
if conf is None:
conf = {}
Expand Down Expand Up @@ -622,6 +661,25 @@ def do_it_all(spark):
return spark.sql(sql)
assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first)

def assert_gpu_fallback_sql(df_fun, table_name, sql, fallback_class_name, conf=None):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df_fun: a function that will create the dataframe
:param table_name: Name of table to be created with the dataframe
:param sql: SQL query to be run on the specified table
:param fallback_class_name: Name of the class that GPU falls back to
:param conf: Any user-specified confs. Empty by default.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
conf = {}
def do_it_all(spark):
df = df_fun(spark)
df.createOrReplaceTempView(table_name)
return spark.sql(sql)
assert_gpu_fallback_collect(do_it_all, fallback_class_name, conf)


def assert_spark_exception(func, error_message):
"""
Assert that a specific Java exception is thrown
Expand Down
Loading