diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index b75872ed8b2..d7fd941b97b 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect, assert_gpu_and_cpu_are_equal_sql from data_gen import * -from marks import ignore_order, incompat, approximate_float, allow_non_gpu, datagen_overrides +from marks import ignore_order, incompat, approximate_float, allow_non_gpu, datagen_overrides, disable_ansi_mode from pyspark.sql.types import * from pyspark.sql.types import IntegralType from spark_session import * @@ -25,6 +25,10 @@ import pyspark.sql.utils from datetime import timedelta +_arithmetic_exception_string = 'java.lang.ArithmeticException' if is_before_spark_330() else \ + 'org.apache.spark.SparkArithmeticException' if is_before_spark_400() else \ + 'pyspark.errors.exceptions.captured.ArithmeticException' + # No overflow gens here because we just focus on verifying the fallback to CPU when # enabling ANSI mode. But overflows will fail the tests because CPU runs raise # exceptions. @@ -95,6 +99,7 @@ def _get_overflow_df(spark, data, data_type, expr): ).selectExpr(expr) @pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn) +@disable_ansi_mode def test_addition(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -119,6 +124,7 @@ def test_addition_ansi_no_overflow(data_gen): conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn) +@disable_ansi_mode def test_subtraction(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -136,6 +142,7 @@ def test_subtraction(data_gen): DecimalGen(10, -2), DecimalGen(15, 3), DecimalGen(30, 12), DecimalGen(3, -3), DecimalGen(27, 7), DecimalGen(20, -3)], ids=idfn) @pytest.mark.parametrize('addOrSub', ['+', '-']) +@disable_ansi_mode def test_addition_subtraction_mixed(lhs, rhs, addOrSub): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr(f"a {addOrSub} b") @@ -160,6 +167,7 @@ def test_subtraction_ansi_no_overflow(data_gen): _decimal_gen_38_10, _decimal_gen_38_neg10 ], ids=idfn) +@disable_ansi_mode def test_multiplication(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -203,6 +211,7 @@ def test_multiplication_ansi_overflow(): @pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 3), DecimalGen(10, -2), DecimalGen(15, 3), DecimalGen(30, 12), DecimalGen(3, -3), DecimalGen(27, 7), DecimalGen(20, -3)], ids=idfn) +@disable_ansi_mode def test_multiplication_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( @@ -220,6 +229,7 @@ def test_float_multiplication_mixed(lhs, rhs): @pytest.mark.parametrize('data_gen', [double_gen, decimal_gen_32bit_neg_scale, DecimalGen(6, 3), DecimalGen(5, 5), DecimalGen(6, 0), DecimalGen(7, 4), DecimalGen(15, 0), DecimalGen(18, 0), DecimalGen(17, 2), DecimalGen(16, 4), DecimalGen(38, 21), DecimalGen(21, 17), DecimalGen(3, -2)], ids=idfn) +@disable_ansi_mode def test_division(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -232,6 +242,7 @@ def test_division(data_gen): @pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(4, 1), DecimalGen(5, 0), DecimalGen(5, 1), DecimalGen(10, 5)], ids=idfn) @pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2), DecimalGen(16, 1)], ids=idfn) +@disable_ansi_mode def test_division_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( @@ -242,12 +253,14 @@ def test_division_mixed(lhs, rhs): # instead of increasing the precision. So we have a second test that deals with a few of these use cases @pytest.mark.parametrize('rhs', [DecimalGen(30, 10), DecimalGen(28, 18)], ids=idfn) @pytest.mark.parametrize('lhs', [DecimalGen(27, 7), DecimalGen(20, -3)], ids=idfn) +@disable_ansi_mode def test_division_mixed_larger_dec(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( f.col('a'), f.col('b'), f.col('a') / f.col('b'))) +@disable_ansi_mode def test_special_decimal_division(): for precision in range(1, 39): for scale in range(-3, precision + 1): @@ -260,6 +273,7 @@ def test_special_decimal_division(): @approximate_float # we should get the perfectly correct answer for floats except when casting a decimal to a float in some corner cases. @pytest.mark.parametrize('rhs', [float_gen, double_gen], ids=idfn) @pytest.mark.parametrize('lhs', [DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(1, -2), DecimalGen(16, 1)], ids=idfn) +@disable_ansi_mode def test_float_division_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( @@ -269,6 +283,7 @@ def test_float_division_mixed(lhs, rhs): @pytest.mark.parametrize('data_gen', integral_gens + [ decimal_gen_32bit, decimal_gen_64bit, _decimal_gen_7_7, _decimal_gen_18_3, _decimal_gen_30_2, _decimal_gen_36_5, _decimal_gen_38_0], ids=idfn) +@disable_ansi_mode def test_int_division(data_gen): string_type = to_cast_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( @@ -282,12 +297,14 @@ def test_int_division(data_gen): @pytest.mark.parametrize('lhs', [DecimalGen(6, 5), DecimalGen(5, 4), DecimalGen(3, -2), _decimal_gen_30_2], ids=idfn) @pytest.mark.parametrize('rhs', [DecimalGen(13, 2), DecimalGen(6, 3), _decimal_gen_38_0, pytest.param(_decimal_gen_36_neg5, marks=pytest.mark.skipif(not is_before_spark_340() or is_databricks113_or_later(), reason='SPARK-41207'))], ids=idfn) +@disable_ansi_mode def test_int_division_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr( 'a DIV b')) @pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn) +@disable_ansi_mode def test_mod(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -308,6 +325,7 @@ def test_mod(data_gen): _decimal_gen_7_7] @pytest.mark.parametrize('data_gen', _pmod_gens, ids=idfn) +@disable_ansi_mode def test_pmod(data_gen): string_type = to_cast_string(data_gen.data_type) assert_gpu_and_cpu_are_equal_collect( @@ -321,6 +339,7 @@ def test_pmod(data_gen): @allow_non_gpu("ProjectExec", "Pmod") @pytest.mark.parametrize('data_gen', test_pmod_fallback_decimal_gens + [_decimal_gen_38_0, _decimal_gen_38_10], ids=idfn) +@disable_ansi_mode def test_pmod_fallback(data_gen): string_type = to_cast_string(data_gen.data_type) assert_gpu_fallback_collect( @@ -372,8 +391,10 @@ def test_cast_neg_to_decimal_err(): data_gen = _decimal_gen_7_7 if is_before_spark_322(): exception_content = "Decimal(compact,-120000000,20,0}) cannot be represented as Decimal(7, 7)" - elif is_databricks113_or_later() or not is_before_spark_340(): + elif is_databricks113_or_later() or not is_before_spark_340() and is_before_spark_400(): exception_content = "[NUMERIC_VALUE_OUT_OF_RANGE] -12 cannot be represented as Decimal(7, 7)" + elif not is_before_spark_400(): + exception_content = "[NUMERIC_VALUE_OUT_OF_RANGE.WITH_SUGGESTION] -12 cannot be represented as Decimal(7, 7)" else: exception_content = "Decimal(compact, -120000000, 20, 0) cannot be represented as Decimal(7, 7)" @@ -410,6 +431,7 @@ def test_mod_pmod_by_zero_not_ansi(data_gen): @pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 3), DecimalGen(10, -2), DecimalGen(15, 3), DecimalGen(30, 12), DecimalGen(3, -3), DecimalGen(27, 7), DecimalGen(20, -3)], ids=idfn) +@disable_ansi_mode def test_mod_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr(f"a % b")) @@ -417,6 +439,7 @@ def test_mod_mixed(lhs, rhs): # @pytest.mark.skipif(not is_databricks113_or_later() and not is_spark_340_or_later(), reason="https://github.com/NVIDIA/spark-rapids/issues/8330") @pytest.mark.parametrize('lhs', [DecimalGen(38,0), DecimalGen(37,2), DecimalGen(38,5), DecimalGen(38,-10), DecimalGen(38,7)], ids=idfn) @pytest.mark.parametrize('rhs', [DecimalGen(27,7), DecimalGen(30,10), DecimalGen(38,1), DecimalGen(36,0), DecimalGen(28,-7)], ids=idfn) +@disable_ansi_mode def test_mod_mixed_decimal128(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr("a", "b", f"a % b")) @@ -424,6 +447,7 @@ def test_mod_mixed_decimal128(lhs, rhs): # Split into 4 tests to permute https://github.com/NVIDIA/spark-rapids/issues/7553 failures @pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen], ids=idfn) @pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen], ids=idfn) +@disable_ansi_mode def test_pmod_mixed_numeric(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr(f"pmod(a, b)")) @@ -433,6 +457,7 @@ def test_pmod_mixed_numeric(lhs, rhs): DecimalGen(4, 2), DecimalGen(3, -2), DecimalGen(16, 7), DecimalGen(19, 0), DecimalGen(30, 10) ], ids=idfn) @pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen], ids=idfn) +@disable_ansi_mode def test_pmod_mixed_decimal_lhs(lhs, rhs): assert_gpu_fallback_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr(f"pmod(a, b)"), @@ -443,6 +468,7 @@ def test_pmod_mixed_decimal_lhs(lhs, rhs): @pytest.mark.parametrize('rhs', [DecimalGen(6, 3), DecimalGen(10, -2), DecimalGen(15, 3), DecimalGen(30, 12), DecimalGen(3, -3), DecimalGen(27, 7), DecimalGen(20, -3) ], ids=idfn) +@disable_ansi_mode def test_pmod_mixed_decimal_rhs(lhs, rhs): assert_gpu_fallback_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr(f"pmod(a, b)"), @@ -455,6 +481,7 @@ def test_pmod_mixed_decimal_rhs(lhs, rhs): @pytest.mark.parametrize('rhs', [DecimalGen(6, 3), DecimalGen(10, -2), DecimalGen(15, 3), DecimalGen(30, 12), DecimalGen(3, -3), DecimalGen(27, 7), DecimalGen(20, -3) ], ids=idfn) +@disable_ansi_mode def test_pmod_mixed_decimal(lhs, rhs): assert_gpu_fallback_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr(f"pmod(a, b)"), @@ -466,6 +493,7 @@ def test_signum(data_gen): lambda spark : unary_op_df(spark, data_gen).selectExpr('signum(a)')) @pytest.mark.parametrize('data_gen', numeric_gens + _arith_decimal_gens_low_precision, ids=idfn) +@disable_ansi_mode def test_unary_minus(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr('-a')) @@ -504,8 +532,7 @@ def test_unary_minus_ansi_overflow(data_type, value): assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df(spark, [value], data_type, '-a').collect(), conf=ansi_enabled_conf, - error_message='java.lang.ArithmeticException' if is_before_spark_330() else \ - 'org.apache.spark.SparkArithmeticException') + error_message=_arithmetic_exception_string) # This just ends up being a pass through. There is no good way to force # a unary positive into a plan, because it gets optimized out, but this @@ -516,6 +543,7 @@ def test_unary_positive(data_gen): lambda spark : unary_op_df(spark, data_gen).selectExpr('+a')) @pytest.mark.parametrize('data_gen', numeric_gens + _arith_decimal_gens_low_precision, ids=idfn) +@disable_ansi_mode def test_abs(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr('abs(a)')) @@ -556,10 +584,9 @@ def test_abs_ansi_overflow(data_type, value): GPU: One or more rows overflow for abs operation. """ assert_gpu_and_cpu_error( - df_fun=lambda spark: _get_overflow_df(spark, [value], data_type, 'abs(a)').collect(), - conf=ansi_enabled_conf, - error_message='java.lang.ArithmeticException' if is_before_spark_330() else \ - 'org.apache.spark.SparkArithmeticException') + df_fun=lambda spark: _get_overflow_df(spark, [value], data_type, 'abs(a)').collect(), + conf=ansi_enabled_conf, + error_message=_arithmetic_exception_string) @approximate_float @pytest.mark.parametrize('data_gen', double_gens, ids=idfn) @@ -613,7 +640,8 @@ def test_ceil_scale_zero(data_gen): @pytest.mark.parametrize('data_gen', [_decimal_gen_36_neg5, _decimal_gen_38_neg10], ids=idfn) def test_floor_ceil_overflow(data_gen): exception_type = "java.lang.ArithmeticException" if is_before_spark_330() and not is_databricks104_or_later() \ - else "SparkArithmeticException" + else "SparkArithmeticException" if is_before_spark_400() else \ + "pyspark.errors.exceptions.captured.ArithmeticException: [NUMERIC_VALUE_OUT_OF_RANGE.WITH_SUGGESTION]" assert_gpu_and_cpu_error( lambda spark: unary_op_df(spark, data_gen).selectExpr('floor(a)').collect(), conf={}, @@ -678,6 +706,7 @@ def test_shift_right_unsigned(data_gen): @approximate_float @datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9350") @pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn) +@disable_ansi_mode def test_decimal_bround(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen).selectExpr( @@ -692,6 +721,7 @@ def test_decimal_bround(data_gen): @approximate_float @datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9847") @pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn) +@disable_ansi_mode def test_decimal_round(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen).selectExpr( @@ -726,6 +756,7 @@ def doit(spark): @incompat @approximate_float +@disable_ansi_mode def test_non_decimal_round_overflow(): gen = StructGen([('byte_c', byte_gen), ('short_c', short_gen), ('int_c', int_gen), ('long_c', long_gen), @@ -1057,7 +1088,8 @@ def _div_overflow_exception_when(expr, ansi_enabled, is_lit=False): ansi_conf = {'spark.sql.ansi.enabled': ansi_enabled} err_exp = 'java.lang.ArithmeticException' if is_before_spark_330() else \ 'org.apache.spark.SparkArithmeticException' \ - if not is_lit or not is_spark_340_or_later() else "pyspark.errors.exceptions.captured.ArithmeticException" + if (not is_lit or not is_spark_340_or_later()) and is_before_spark_400() else \ + "pyspark.errors.exceptions.captured.ArithmeticException" err_mess = ': Overflow in integral divide' \ if is_before_spark_340() and not is_databricks113_or_later() else \ ': [ARITHMETIC_OVERFLOW] Overflow in integral divide' @@ -1123,7 +1155,7 @@ def test_add_overflow_with_ansi_enabled(data, tp, expr): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), conf=ansi_enabled_conf, - error_message='java.lang.ArithmeticException' if is_before_spark_330() else 'SparkArithmeticException') + error_message=_arithmetic_exception_string) elif isinstance(tp, DecimalType): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), @@ -1152,7 +1184,8 @@ def test_subtraction_overflow_with_ansi_enabled(data, tp, expr): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), conf=ansi_enabled_conf, - error_message='java.lang.ArithmeticException' if is_before_spark_330() else 'SparkArithmeticException') + error_message='java.lang.ArithmeticException' if is_before_spark_330() else 'SparkArithmeticException' \ + if is_before_spark_400() else "pyspark.errors.exceptions.captured.ArithmeticException:") elif isinstance(tp, DecimalType): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), @@ -1183,7 +1216,7 @@ def test_unary_minus_ansi_overflow_day_time_interval(ansi_enabled): assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df(spark, [timedelta(microseconds=LONG_MIN)], DayTimeIntervalType(), '-a').collect(), conf={'spark.sql.ansi.enabled': ansi_enabled}, - error_message='SparkArithmeticException') + error_message='SparkArithmeticException' if is_before_spark_400() else "ArithmeticException") @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') @pytest.mark.parametrize('ansi_enabled', ['false', 'true']) @@ -1224,7 +1257,7 @@ def test_add_overflow_with_ansi_enabled_day_time_interval(ansi_enabled): StructType([StructField('a', DayTimeIntervalType()), StructField('b', DayTimeIntervalType())]) ).selectExpr('a + b').collect(), conf={'spark.sql.ansi.enabled': ansi_enabled}, - error_message='SparkArithmeticException') + error_message=_arithmetic_exception_string) @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') @pytest.mark.parametrize('ansi_enabled', ['false', 'true']) @@ -1244,7 +1277,7 @@ def test_subtraction_overflow_with_ansi_enabled_day_time_interval(ansi_enabled): StructType([StructField('a', DayTimeIntervalType()), StructField('b', DayTimeIntervalType())]) ).selectExpr('a - b').collect(), conf={'spark.sql.ansi.enabled': ansi_enabled}, - error_message='SparkArithmeticException') + error_message='SparkArithmeticException' if is_before_spark_400() else "ArithmeticException") @pytest.mark.skipif(is_before_spark_330(), reason='DayTimeInterval is not supported before Pyspark 3.3.0') def test_unary_positive_day_time_interval(): @@ -1303,7 +1336,8 @@ def _get_overflow_df_2cols(spark, data_types, values, expr): def test_day_time_interval_division_overflow(data_type, value_pair): exception_message = "SparkArithmeticException: Overflow in integral divide." \ if is_before_spark_340() and not is_databricks113_or_later() else \ - "SparkArithmeticException: [ARITHMETIC_OVERFLOW] Overflow in integral divide." + "SparkArithmeticException: [ARITHMETIC_OVERFLOW] Overflow in integral divide." if is_before_spark_400() else \ + "ArithmeticException: [ARITHMETIC_OVERFLOW] Overflow in integral divide." assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df_2cols(spark, [DayTimeIntervalType(), data_type], value_pair, 'a / b').collect(), conf={}, @@ -1338,7 +1372,8 @@ def test_day_time_interval_division_round_overflow(data_type, value_pair): def test_day_time_interval_divided_by_zero(data_type, value_pair): exception_message = "SparkArithmeticException: Division by zero." \ if is_before_spark_340() and not is_databricks113_or_later() else \ - "SparkArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero" + "SparkArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero" if is_before_spark_400() else \ + "ArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero" assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df_2cols(spark, [DayTimeIntervalType(), data_type], value_pair, 'a / b').collect(), conf={}, @@ -1349,7 +1384,8 @@ def test_day_time_interval_divided_by_zero(data_type, value_pair): def test_day_time_interval_divided_by_zero_scalar(zero_literal): exception_message = "SparkArithmeticException: Division by zero." \ if is_before_spark_340() and not is_databricks113_or_later() else \ - "SparkArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero." + "SparkArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero." if is_before_spark_400() else \ + "ArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero" assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df_1col(spark, DayTimeIntervalType(), [timedelta(seconds=1)], 'a / ' + zero_literal).collect(), conf={}, @@ -1369,7 +1405,8 @@ def test_day_time_interval_divided_by_zero_scalar(zero_literal): def test_day_time_interval_scalar_divided_by_zero(data_type, value): exception_message = "SparkArithmeticException: Division by zero." \ if is_before_spark_340() and not is_databricks113_or_later() else \ - "SparkArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero." + "SparkArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero." if is_before_spark_400() else \ + "ArithmeticException: [INTERVAL_DIVIDED_BY_ZERO] Division by zero" assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df_1col(spark, data_type, [value], 'INTERVAL 1 SECOND / a').collect(), conf={}, diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index 1adeb6964fd..6af40b99768 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -54,6 +54,7 @@ def array_columns_to_sort_locally(): _allow_any_non_gpu = False _non_gpu_allowed = [] +_per_test_ansi_mode_enabled = None def is_allowing_any_non_gpu(): return _allow_any_non_gpu @@ -61,6 +62,11 @@ def is_allowing_any_non_gpu(): def get_non_gpu_allowed(): return _non_gpu_allowed + +def is_per_test_ansi_mode_enabled(): + return _per_test_ansi_mode_enabled + + def get_validate_execs_in_gpu_plan(): return _validate_execs_in_gpu_plan @@ -210,10 +216,14 @@ def pytest_runtest_setup(item): global _allow_any_non_gpu global _non_gpu_allowed + global _per_test_ansi_mode_enabled _non_gpu_allowed_databricks = [] _allow_any_non_gpu_databricks = False non_gpu_databricks = item.get_closest_marker('allow_non_gpu_databricks') non_gpu = item.get_closest_marker('allow_non_gpu') + _per_test_ansi_mode_enabled = None if item.get_closest_marker('disable_ansi_mode') is None \ + else not item.get_closest_marker('disable_ansi_mode') + if non_gpu_databricks: if is_databricks_runtime(): if non_gpu_databricks.kwargs and non_gpu_databricks.kwargs['any']: diff --git a/integration_tests/src/main/python/marks.py b/integration_tests/src/main/python/marks.py index 1f326a75505..9a0bde11113 100644 --- a/integration_tests/src/main/python/marks.py +++ b/integration_tests/src/main/python/marks.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ allow_non_gpu_databricks = pytest.mark.allow_non_gpu_databricks allow_non_gpu = pytest.mark.allow_non_gpu +disable_ansi_mode = pytest.mark.disable_ansi_mode validate_execs_in_gpu_plan = pytest.mark.validate_execs_in_gpu_plan approximate_float = pytest.mark.approximate_float ignore_order = pytest.mark.ignore_order diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index c55f1976497..26388617fff 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -16,7 +16,7 @@ import calendar, time from datetime import date, datetime from contextlib import contextmanager, ExitStack -from conftest import is_allowing_any_non_gpu, get_non_gpu_allowed, get_validate_execs_in_gpu_plan, is_databricks_runtime, is_at_least_precommit_run, get_inject_oom_conf +from conftest import is_allowing_any_non_gpu, get_non_gpu_allowed, get_validate_execs_in_gpu_plan, is_databricks_runtime, is_at_least_precommit_run, get_inject_oom_conf, is_per_test_ansi_mode_enabled from pyspark.sql import DataFrame from pyspark.sql.types import TimestampType, DateType, _acceptable_types from spark_init_internal import get_spark_i_know_what_i_am_doing, spark_version @@ -41,7 +41,6 @@ def _from_scala_map(scala_map): # Many of these are redundant with default settings for the configs but are set here explicitly # to ensure any cluster settings do not interfere with tests that assume the defaults. _default_conf = { - 'spark.ansi.enabled': 'false', 'spark.rapids.sql.castDecimalToFloat.enabled': 'false', 'spark.rapids.sql.castFloatToDecimal.enabled': 'false', 'spark.rapids.sql.castFloatToIntegralTypes.enabled': 'false', @@ -127,6 +126,9 @@ def with_spark_session(func, conf={}): """Run func that takes a spark session as input with the given configs set.""" reset_spark_session_conf() _add_job_description(conf) + # Only set the ansi conf if not set by the test explicitly by setting the value in the dict + if "spark.sql.ansi.enabled" not in conf and is_per_test_ansi_mode_enabled() is not None: + conf["spark.sql.ansi.enabled"] = is_per_test_ansi_mode_enabled() _set_all_confs(conf) ret = func(_spark) _check_for_proper_return_values(ret) @@ -205,6 +207,9 @@ def is_before_spark_350(): def is_before_spark_351(): return spark_version() < "3.5.1" +def is_before_spark_400(): + return spark_version() < "4.0.0" + def is_spark_320_or_later(): return spark_version() >= "3.2.0"