Skip to content

Commit

Permalink
Merge branch 'NVIDIA:branch-24.02' into rebalance-premerge-time
Browse files Browse the repository at this point in the history
  • Loading branch information
YanxuanLiu authored Dec 25, 2023
2 parents 5ee3666 + bb235c9 commit 9601a9d
Show file tree
Hide file tree
Showing 32 changed files with 598 additions and 660 deletions.
20 changes: 20 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,26 @@ integration tests. For example:
$ DATAGEN_SEED=1702166057 SPARK_HOME=~/spark-3.4.0-bin-hadoop3 integration_tests/run_pyspark_from_build.sh
```
### Running with non-UTC time zone
For the new added cases, we should check non-UTC time zone is working, or the non-UTC nightly CIs will fail.
The non-UTC nightly CIs are verifing all cases with non-UTC time zone.
But only a small amout of cases are verifing with non-UTC time zone in the pre-merge CI due to limited GPU resources.
When adding cases, should also check non-UTC is working besides the default UTC time zone.
Please test the following time zones:
```shell
$ TZ=Iran ./integration_tests/run_pyspark_from_build.sh
$ TZ=America/Los_Angeles ./integration_tests/run_pyspark_from_build.sh
```
`Iran` is non-DST(Daylight Savings Time) time zone and `America/Los_Angeles` is DST time zone.
If the new added cases failed with non-UTC, then should allow the operator(does not support non-UTC) fallback,
For example, add the following annotation to the case:
```python
non_utc_allow_for_sequence = ['ProjectExec'] # Update after non-utc time zone is supported for sequence
@allow_non_gpu(*non_utc_allow_for_sequence)
test_my_new_added_case_for_sequence_operator()
```
### Reviewing integration tests in Spark History Server
If the integration tests are run using [run_pyspark_from_build.sh](run_pyspark_from_build.sh) we have
Expand Down
5 changes: 4 additions & 1 deletion integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_gpu_fallback_collect
from data_gen import *
from conftest import is_databricks_runtime
from marks import incompat
from marks import incompat, allow_non_gpu
from spark_session import is_before_spark_313, is_before_spark_330, is_databricks113_or_later, is_spark_330_or_later, is_databricks104_or_later, is_spark_33X, is_spark_340_or_later, is_spark_330, is_spark_330cdh
from pyspark.sql.types import *
from pyspark.sql.types import IntegralType
Expand Down Expand Up @@ -332,11 +332,14 @@ def do_it(spark):

assert_gpu_and_cpu_are_equal_collect(do_it)

non_utc_allow_for_sequence = ['ProjectExec'] # Update after non-utc time zone is supported for sequence
@allow_non_gpu(*non_utc_allow_for_sequence)
def test_array_transform_non_deterministic():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.range(1).selectExpr("transform(sequence(0, cast(rand(5)*10 as int) + 1), x -> x * 22) as t"),
conf={'spark.rapids.sql.castFloatToIntegralTypes.enabled': True})

@allow_non_gpu(*non_utc_allow_for_sequence)
def test_array_transform_non_deterministic_second_param():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : debug_df(spark.range(1).selectExpr("transform(sequence(0, cast(rand(5)*10 as int) + 1), (x, i) -> x + i) as t")),
Expand Down
39 changes: 23 additions & 16 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from conftest import is_incompat, should_sort_on_spark, should_sort_locally, get_float_check, get_limit, spark_jvm
from conftest import is_incompat, should_sort_on_spark, should_sort_locally, array_columns_to_sort_locally, get_float_check, get_limit, spark_jvm
from datetime import date, datetime, timedelta
from decimal import Decimal
import math
Expand Down Expand Up @@ -220,6 +220,20 @@ def bring_back(spark):
raise RuntimeError('Local Sort is only supported on a collect')
return (bring_back, collect_type)

# Sort each of the result sets. If there are array columns to sort,
# then sort each of those values in each row
def _sort_locally(*results):
array_columns = array_columns_to_sort_locally()
def sort_rows(rows):
if array_columns:
for r in rows:
for col in array_columns:
r[col].sort(key=_RowCmp)
rows.sort(key=_RowCmp)

for rows in results:
sort_rows(rows)

def _prep_incompat_conf(conf):
if is_incompat():
conf = dict(conf) # Make a copy before we change anything
Expand Down Expand Up @@ -257,8 +271,7 @@ def _assert_gpu_and_cpu_writes_are_equal(
from_cpu = with_cpu_session(cpu_bring_back, conf=conf)
from_gpu = with_cpu_session(gpu_bring_back, conf=conf)
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)

Expand Down Expand Up @@ -317,8 +330,7 @@ def do_write(spark, table_name):
from_cpu = with_cpu_session(cpu_bring_back, conf=conf)
from_gpu = with_cpu_session(gpu_bring_back, conf=conf)
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)

Expand Down Expand Up @@ -366,8 +378,7 @@ def assert_gpu_fallback_write(write_func,
from_cpu = with_cpu_session(cpu_bring_back, conf=conf)
from_gpu = with_cpu_session(gpu_bring_back, conf=conf)
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)
finally:
Expand Down Expand Up @@ -403,8 +414,7 @@ def assert_cpu_and_gpu_are_equal_collect_with_capture(func,
print('### {}: GPU TOOK {} CPU TOOK {} ###'.format(collect_type,
gpu_end - gpu_start, cpu_end - cpu_start))
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)

Expand Down Expand Up @@ -446,8 +456,7 @@ def assert_gpu_fallback_collect(func,
print('### {}: GPU TOOK {} CPU TOOK {} ###'.format(collect_type,
gpu_end - gpu_start, cpu_end - cpu_start))
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)

Expand Down Expand Up @@ -503,8 +512,7 @@ def run_on_gpu():
(from_cpu, from_gpu) = result_canonicalize_func_before_compare(from_cpu, from_gpu)

if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)
_sort_locally(from_cpu, from_gpu)

assert_equal(from_cpu, from_gpu)

Expand All @@ -530,7 +538,7 @@ def run_on_cpu():
print('### {}: CPU TOOK {} ###'.format(collect_type,
cpu_end - cpu_start))
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
_sort_locally(from_cpu)

return from_cpu

Expand Down Expand Up @@ -564,8 +572,7 @@ def run_on_gpu():
print('### {}: GPU TOOK {} CPU TOOK {} ###'.format(collect_type,
gpu_end - gpu_start, cpu_end - cpu_start))
if should_sort_locally():
from_cpu.sort(key=_RowCmp)
from_gpu.sort(key=_RowCmp)
_sort_locally(from_cpu, from_gpu)

return (from_cpu, from_gpu)

Expand Down
6 changes: 6 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ def is_incompat():

_sort_on_spark = False
_sort_locally = False
_sort_array_columns_locally = []

def should_sort_on_spark():
return _sort_on_spark

def should_sort_locally():
return _sort_locally

def array_columns_to_sort_locally():
return _sort_array_columns_locally

_allow_any_non_gpu = False
_non_gpu_allowed = []

Expand Down Expand Up @@ -169,6 +173,7 @@ def get_std_input_path():
def pytest_runtest_setup(item):
global _sort_on_spark
global _sort_locally
global _sort_array_columns_locally
global _inject_oom
global _test_datagen_random_seed
_inject_oom = item.get_closest_marker('inject_oom')
Expand All @@ -188,6 +193,7 @@ def pytest_runtest_setup(item):
if order.kwargs.get('local', False):
_sort_on_spark = False
_sort_locally = True
_sort_array_columns_locally = order.kwargs.get('arrays', [])
else:
_sort_on_spark = True
_sort_locally = False
Expand Down
16 changes: 6 additions & 10 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,19 +584,11 @@ class TimestampGen(DataGen):
def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc):
super().__init__(TimestampNTZType() if tzinfo==None else TimestampType(), nullable=nullable)
if start is None:
# Spark supports times starting at
# "0001-01-01 00:00:00.000000"
# but it has issues if you get really close to that because it tries to do things
# in a different format which causes roundoff, so we have to add a few days, even a month,
# just to be sure
start = datetime(1, 2, 1, tzinfo=tzinfo)
start = datetime(1, 1, 1, tzinfo=tzinfo)
elif not isinstance(start, datetime):
raise RuntimeError('Unsupported type passed in for start {}'.format(start))

# Spark supports time through: "9999-12-31 23:59:59.999999"
# but in order to avoid out-of-range error in non-UTC time zone, here use 9999-12-30 instead of 12-31 as max end
# for details, refer to https://github.com/NVIDIA/spark-rapids/issues/7535
max_end = datetime(9999, 12, 30, 23, 59, 59, 999999, tzinfo=tzinfo)
max_end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=tzinfo)
if end is None:
end = max_end
elif isinstance(end, timedelta):
Expand All @@ -612,6 +604,10 @@ def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc):
self._start_time = self._to_us_since_epoch(start)
self._end_time = self._to_us_since_epoch(end)
self._tzinfo = tzinfo

self.with_special_case(start)
self.with_special_case(end)

if (self._epoch >= start and self._epoch <= end):
self.with_special_case(self._epoch)

Expand Down
12 changes: 4 additions & 8 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def test_string_unix_timestamp_ansi_exception():
error_message="Exception",
conf=ansi_enabled_conf)

@pytest.mark.parametrize('data_gen', [StringGen('200[0-9]-0[1-9]-[0-2][1-8]')], ids=idfn)
@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{4}-0[1-9]-[0-2][1-8]')], ids=idfn)
@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@allow_non_gpu(*non_utc_allow)
def test_gettimestamp(data_gen, ansi_enabled):
Expand All @@ -431,7 +431,7 @@ def test_gettimestamp(data_gen, ansi_enabled):
{'spark.sql.ansi.enabled': ansi_enabled})


@pytest.mark.parametrize('data_gen', [StringGen('0[1-9]200[0-9]')], ids=idfn)
@pytest.mark.parametrize('data_gen', [StringGen('0[1-9][0-9]{4}')], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_gettimestamp_format_MMyyyy(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -456,21 +456,17 @@ def test_date_format_for_date(data_gen, date_format):
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.skipif(not is_supported_time_zone(), reason="not all time zones are supported now, refer to https://github.com/NVIDIA/spark-rapids/issues/6839, please update after all time zones are supported")
def test_date_format_for_time(data_gen, date_format):
conf = {'spark.rapids.sql.nonUTC.enabled': True}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)),
conf)
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)))

@pytest.mark.parametrize('date_format', supported_date_formats, ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.skipif(is_supported_time_zone(), reason="not all time zones are supported now, refer to https://github.com/NVIDIA/spark-rapids/issues/6839, please update after all time zones are supported")
@allow_non_gpu('ProjectExec')
def test_date_format_for_time_fall_back(data_gen, date_format):
conf = {'spark.rapids.sql.nonUTC.enabled': True}
assert_gpu_fallback_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("date_format(a, '{}')".format(date_format)),
'ProjectExec',
conf)
'ProjectExec')

@pytest.mark.parametrize('date_format', supported_date_formats + ['yyyyMMdd'], ids=idfn)
# from 0001-02-01 to 9999-12-30 to avoid 'year 0 is out of range'
Expand Down
43 changes: 14 additions & 29 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import pytest

from asserts import *
from conftest import is_databricks_runtime
from conftest import is_databricks_runtime, spark_jvm
from conftest import is_not_utc
from data_gen import *
from functools import reduce
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *
from marks import *
import pyspark.sql.functions as f
Expand Down Expand Up @@ -624,23 +625,19 @@ def test_min_max_group_by(data_gen):
.agg(f.min('b'), f.max('b')))

# To avoid ordering issues with collect_list, sorting the arrays that are returned.
# Note, using sort_array() on the CPU, because sort_array() does not yet
# NOTE: sorting the arrays locally, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
# and https://github.com/rapidsai/cudf/issues/11222
@allow_non_gpu("ProjectExec", "SortArray", *non_utc_allow)
@ignore_order(local=True)
@allow_non_gpu("ProjectExec", *non_utc_allow)
@ignore_order(local=True, arrays=["blist"])
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_list_op, ids=idfn)
@pytest.mark.parametrize('use_obj_hash_agg', [True, False], ids=idfn)
def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg):
def doit(spark):
df = gen_df(spark, data_gen, length=100)\
return gen_df(spark, data_gen, length=100)\
.groupby('a')\
.agg(f.collect_list('b').alias("blist"))
# pull out the rdd and schema and create a new dataframe to run SortArray
# to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec
# to ObjectHashAggregateExec
return spark.createDataFrame(df.rdd, schema=df.schema).select("a", f.sort_array("blist"))
assert_gpu_and_cpu_are_equal_collect(
doit,
conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower()})
Expand Down Expand Up @@ -680,28 +677,22 @@ def test_hash_groupby_collect_set_on_nested_type(data_gen):
.agg(f.sort_array(f.collect_set('b'))))


# Note, using sort_array() on the CPU, because sort_array() does not yet
# NOTE: sorting the arrays locally, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
# and https://github.com/rapidsai/cudf/issues/11222
@ignore_order(local=True)
@allow_non_gpu("ProjectExec", "SortArray", *non_utc_allow)
@ignore_order(local=True, arrays=["collect_set"])
@allow_non_gpu("ProjectExec", *non_utc_allow)
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn)
def test_hash_groupby_collect_set_on_nested_array_type(data_gen):
conf = copy_and_update(_float_conf, {
"spark.rapids.sql.castFloatToString.enabled": "true",
"spark.rapids.sql.expression.SortArray": "false"
})

def do_it(spark):
df = gen_df(spark, data_gen, length=100)\
return gen_df(spark, data_gen, length=100)\
.groupby('a')\
.agg(f.collect_set('b').alias("collect_set"))
# pull out the rdd and schema and create a new dataframe to run SortArray
# to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec
# to ObjectHashAggregateExec
return spark.createDataFrame(df.rdd, schema=df.schema)\
.selectExpr("sort_array(collect_set)")

assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf)

Expand All @@ -723,27 +714,21 @@ def test_hash_reduction_collect_set_on_nested_type(data_gen):
.agg(f.sort_array(f.collect_set('b'))))


# Note, using sort_array() on the CPU, because sort_array() does not yet
# NOTE: sorting the arrays locally, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
# and https://github.com/rapidsai/cudf/issues/11222
@ignore_order(local=True)
@allow_non_gpu("ProjectExec", "SortArray", *non_utc_allow)
@ignore_order(local=True, arrays=["collect_set"])
@allow_non_gpu("ProjectExec", *non_utc_allow)
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn)
def test_hash_reduction_collect_set_on_nested_array_type(data_gen):
conf = copy_and_update(_float_conf, {
"spark.rapids.sql.castFloatToString.enabled": "true",
"spark.rapids.sql.expression.SortArray": "false"
})

def do_it(spark):
df = gen_df(spark, data_gen, length=100)\
return gen_df(spark, data_gen, length=100)\
.agg(f.collect_set('b').alias("collect_set"))
# pull out the rdd and schema and create a new dataframe to run SortArray
# to handle Spark 3.3.0+ optimization that moves SortArray from ProjectExec
# to ObjectHashAggregateExec
return spark.createDataFrame(df.rdd, schema=df.schema)\
.selectExpr("sort_array(collect_set)")

assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf)

Expand Down
Loading

0 comments on commit 9601a9d

Please sign in to comment.