Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a random seed specific to datagen cases #9441

Merged
merged 17 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ EOF
fi
fi

# Set a seed to be used in the tests, for datagen
export SPARK_RAPIDS_TEST_DATAGEN_SEED=${SPARK_RAPIDS_TEST_DATAGEN_SEED:-`date +%s`}
echo "SPARK_RAPIDS_TEST_DATAGEN_SEED used: $SPARK_RAPIDS_TEST_DATAGEN_SEED"

# Set a seed to be used to pick random tests to inject with OOM
export SPARK_RAPIDS_TEST_INJECT_OOM_SEED=${SPARK_RAPIDS_TEST_INJECT_OOM_SEED:-`date +%s`}
echo "SPARK_RAPIDS_TEST_INJECT_OOM_SEED used: $SPARK_RAPIDS_TEST_INJECT_OOM_SEED"
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from marks import ignore_order, incompat, approximate_float, allow_non_gpu
from marks import ignore_order, incompat, approximate_float, allow_non_gpu, datagen_overrides
from pyspark.sql.types import *
from pyspark.sql.types import IntegralType
from spark_session import *
Expand Down Expand Up @@ -585,6 +585,7 @@ def test_floor(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a)'))

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9722')
@pytest.mark.skipif(is_before_spark_330(), reason='scale parameter in Floor function is not supported before Spark 3.3.0')
@pytest.mark.parametrize('data_gen', double_n_long_gens + _arith_decimal_gens_no_neg_scale, ids=idfn)
def test_floor_scale_zero(data_gen):
Expand Down Expand Up @@ -677,6 +678,7 @@ def test_shift_right_unsigned(data_gen):

@incompat
@approximate_float
@datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9350")
@pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn)
def test_decimal_bround(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/ast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from asserts import assert_cpu_and_gpu_are_equal_collect_with_capture
from data_gen import *
from marks import approximate_float
from marks import approximate_float, datagen_overrides
from spark_session import with_cpu_session, is_before_spark_330
import pyspark.sql.functions as f

Expand Down Expand Up @@ -259,6 +259,7 @@ def test_lt(data_descr):
s2 < f.col('b'),
f.col('a') < f.col('b')))

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9711')
@pytest.mark.parametrize('data_descr', ast_comparable_descrs, ids=idfn)
def test_lte(data_descr):
(s1, s2) = with_cpu_session(lambda spark: gen_scalars(data_descr[0], 2))
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ def do_join(spark):
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@ignore_order
def test_cache_expand_exec(data_gen, enable_vectorized_conf):
def op_df(spark, length=2048, seed=0):
def op_df(spark, length=2048):
cached = gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).cache()
('b', IntegerGen())], nullable=False), length=length).cache()
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.col("b"))

Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from spark_session import is_before_spark_320, is_before_spark_330, is_spark_340_or_later, \
is_databricks113_or_later
from marks import allow_non_gpu, approximate_float
from marks import allow_non_gpu, approximate_float, datagen_overrides
from pyspark.sql.types import *
from spark_init_internal import spark_version
from datetime import date, datetime
Expand Down Expand Up @@ -146,6 +146,7 @@ def test_cast_string_date_non_ansi():
lambda spark: spark.createDataFrame(data_rows, "a string").select(f.col('a').cast(DateType())),
conf={'spark.rapids.sql.hasExtendedYearValues': 'false'})

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9708')
@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}'),
StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9]'),
StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9].[0-9]{0,6}Z?')],
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from data_gen import *
from spark_session import with_cpu_session, is_before_spark_330
from pyspark.sql.types import *
from marks import datagen_overrides
import pyspark.sql.functions as f

@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + struct_gens_sample_with_decimal128_no_list, ids=idfn)
Expand Down Expand Up @@ -329,6 +330,7 @@ def test_in(data_gen):

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries over that value.
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9687')
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in_set(data_gen):
# nulls are not supported for in on the GPU yet
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pyspark.sql.functions as f
import pyspark.sql.utils
from spark_session import with_cpu_session, with_gpu_session
from conftest import get_datagen_seed

nested_gens = [ArrayGen(LongGen()), ArrayGen(decimal_gen_128bit),
StructGen([("a", LongGen()), ("b", decimal_gen_128bit)]),
Expand Down Expand Up @@ -258,8 +259,11 @@ def test_sequence_without_step(start_gen, stop_gen):

@pytest.mark.parametrize('start_gen,stop_gen,step_gen', sequence_normal_integral_gens, ids=idfn)
def test_sequence_with_step(start_gen, stop_gen, step_gen):
# Get the datagen seed we use for all datagens, since we need to call start
# on step_gen
data_gen_seed = get_datagen_seed()
# Get a step scalar from the 'step_gen' which follows the rules.
step_gen.start(random.Random(0))
step_gen.start(random.Random(data_gen_seed))
step_lit = step_gen.gen()
assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, start_gen, stop_gen, step_gen).selectExpr(
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from data_gen import *
from spark_session import is_before_spark_320, is_jvm_charset_utf8
from pyspark.sql.types import *
from marks import datagen_overrides
import pyspark.sql.functions as f

def mk_str_gen(pattern):
Expand Down Expand Up @@ -68,6 +69,7 @@ def test_if_else_map(data_gen):
'IF(TRUE, b, c)',
'IF(a, b, c)'))

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9685')
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens, ids=idfn)
def test_case_when(data_gen):
Expand Down Expand Up @@ -130,6 +132,7 @@ def test_nvl(data_gen):
# in both cpu and gpu runs.
# E: java.lang.AssertionError: assertion failed: each serializer expression should contain\
# at least one `BoundReference`
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9684')
@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens_nonempty_struct + map_gens_sample, ids=idfn)
def test_coalesce(data_gen):
num_cols = 20
Expand All @@ -140,7 +143,6 @@ def test_coalesce(data_gen):
for x in range(0, num_cols)], nullable=False)
command_args = [f.col('_c' + str(x)) for x in range(0, num_cols)]
command_args.append(s1)
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen).select(
f.coalesce(*command_args)))
Expand Down
25 changes: 24 additions & 1 deletion integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,17 @@ def is_parquet_testing_tests_forced():
_inject_oom = None

def should_inject_oom():
global _inject_oom
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the global not needed any more?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I owed @gerashegalov from a long time ago #7925 (comment).

Since we don't need to change the variable, it's not needed.

return _inject_oom != None

# For datagen: we expect a seed to be provided by the environment, or default to 0.
# Note that tests can override their seed when calling into datagen by setting seed= in their tests.
_test_datagen_random_seed = int(os.getenv("SPARK_RAPIDS_TEST_DATAGEN_SEED", 0))
print(f"Starting with datagen test seed: {_test_datagen_random_seed}. "
"Set env variable SPARK_RAPIDS_TEST_DATAGEN_SEED to override.")

def get_datagen_seed():
return _test_datagen_random_seed

def get_limit():
return _limit

Expand All @@ -133,7 +141,12 @@ def pytest_runtest_setup(item):
global _sort_on_spark
global _sort_locally
global _inject_oom
global _test_datagen_random_seed
_inject_oom = item.get_closest_marker('inject_oom')
datagen_overrides = item.get_closest_marker('datagen_overrides')
if datagen_overrides:
_test_datagen_random_seed = datagen_overrides.kwargs.get('seed', _test_datagen_random_seed)

order = item.get_closest_marker('ignore_order')
if order:
if order.kwargs.get('local', False):
Expand Down Expand Up @@ -260,6 +273,16 @@ def pytest_collection_modifyitems(config, items):
# decide if OOMs should be injected, and when
injection_mode = config.getoption('test_oom_injection_mode').lower()
inject_choice = False
datagen_overrides = item.get_closest_marker('datagen_overrides')
if datagen_overrides:
test_datagen_random_seed_choice = datagen_overrides.kwargs.get('seed', _test_datagen_random_seed)
if test_datagen_random_seed_choice != _test_datagen_random_seed:
extras.append('DATAGEN_SEED_OVERRIDE=%s' % str(test_datagen_random_seed_choice))
else:
extras.append('DATAGEN_SEED=%s' % str(test_datagen_random_seed_choice))
else:
extras.append('DATAGEN_SEED=%s' % str(_test_datagen_random_seed))

if injection_mode == 'random':
inject_choice = r.randrange(0, 2) == 1
elif injection_mode == 'always':
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list,
"'T'HH:mm[:ss]",
"'T'HH:mm"]

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9701')
@pytest.mark.parametrize('ts_part', csv_supported_ts_parts)
@pytest.mark.parametrize('date_format', csv_supported_date_formats)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
Expand Down
42 changes: 26 additions & 16 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from spark_session import is_tz_utc, is_before_spark_340, with_cpu_session
import sre_yield
import struct
from conftest import skip_unless_precommit_tests
from conftest import skip_unless_precommit_tests,get_datagen_seed
import time
import os
from functools import lru_cache
Expand Down Expand Up @@ -756,14 +756,19 @@ def skip_if_not_utc():
# Note: Current(2023/06/06) maxmium IT data size is 7282688 bytes, so LRU cache with maxsize 128
# will lead to 7282688 * 128 = 932 MB additional memory usage in edge case, which is acceptable.
@lru_cache(maxsize=128, typed=True)
def gen_df_help(data_gen, length, seed):
rand = random.Random(seed)
def gen_df_help(data_gen, length, seed_value):
rand = random.Random(seed_value)
data_gen.start(rand)
data = [data_gen.gen() for index in range(0, length)]
return data

def gen_df(spark, data_gen, length=2048, seed=0, num_slices=None):
def gen_df(spark, data_gen, length=2048, seed=None, num_slices=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a follow on issue to remove seed for gen_df and force us to go through the annotation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Generate a spark dataframe from the given data generators."""
if seed is None:
seed_value = get_datagen_seed()
else:
seed_value = seed

if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
else:
Expand All @@ -775,7 +780,7 @@ def gen_df(spark, data_gen, length=2048, seed=0, num_slices=None):
if src.contains_ts():
skip_if_not_utc()

data = gen_df_help(src, length, seed)
data = gen_df_help(src, length, seed_value)

# We use `numSlices` to create an RDD with the specific number of partitions,
# which is then turned into a dataframe. If not specified, it is `None` (default spark value)
Expand Down Expand Up @@ -816,39 +821,44 @@ def _mark_as_lit(data, data_type):
# lit does not take a data type so we might have to cast it
return f.lit(data).cast(data_type)

def _gen_scalars_common(data_gen, count, seed=0):
def _gen_scalars_common(data_gen, count, seed=None):
if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
else:
src = data_gen

if seed is None:
seed_value = get_datagen_seed()
else:
seed_value = seed

# Before we get too far we need to verify that we can run with timestamps
if src.contains_ts():
skip_if_not_utc()

rand = random.Random(seed)
rand = random.Random(seed_value)
src.start(rand)
return src

def gen_scalars(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalars(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values."""
if force_no_nulls:
assert(not isinstance(data_gen, NullGen))
src = _gen_scalars_common(data_gen, count, seed=seed)
data_type = src.data_type
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count))

def gen_scalar(data_gen, seed=0, force_no_nulls=False):
def gen_scalar(data_gen, seed=None, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalars(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]

def gen_scalar_values(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalar_values(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values."""
src = _gen_scalars_common(data_gen, count, seed=seed)
return (src.gen(force_no_nulls=force_no_nulls) for i in range(0, count))

def gen_scalar_value(data_gen, seed=0, force_no_nulls=False):
def gen_scalar_value(data_gen, seed=None, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]
Expand Down Expand Up @@ -890,18 +900,18 @@ def tmp(something):
return meta + idfn(something)
return tmp

def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=0, num_slices=None):
def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=None, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen),('c', c_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def two_col_df(spark, a_gen, b_gen, length=2048, seed=0, num_slices=None):
def two_col_df(spark, a_gen, b_gen, length=2048, seed=None, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def binary_op_df(spark, gen, length=2048, seed=0, num_slices=None):
def binary_op_df(spark, gen, length=2048, seed=None, num_slices=None):
return two_col_df(spark, gen, gen, length=length, seed=seed, num_slices=num_slices)

def unary_op_df(spark, gen, length=2048, seed=0, num_slices=None):
def unary_op_df(spark, gen, length=2048, seed=None, num_slices=None):
return gen_df(spark, StructGen([('a', gen)], nullable=False),
length=length, seed=seed, num_slices=num_slices)

Expand Down Expand Up @@ -974,7 +984,7 @@ def _convert_to_sql(spark_type, data):
else:
return 'CAST({} as {})'.format(d, to_cast_string(spark_type))

def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalars_for_sql(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values, but strings that can be used in selectExpr or SQL"""
src = _gen_scalars_common(data_gen, count, seed=seed)
if isinstance(data_gen, NullGen):
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/expand_exec_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
# see https://issues.apache.org/jira/browse/SPARK-40089.
@ignore_order(local=True)
def test_expand_exec(data_gen):
def op_df(spark, length=2048, seed=0):
def op_df(spark, length=2048):
return gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).rollup(f.col("a"), f.col("b")).agg(f.col("b"))
('b', IntegerGen())], nullable=False), length=length).rollup(f.col("a"), f.col("b")).agg(f.col("b"))

assert_gpu_and_cpu_are_equal_collect(op_df)
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
arrays_with_binary = [ArrayGen(BinaryGen(max_length=5))]
maps_with_binary = [MapGen(IntegerGen(nullable=False), BinaryGen(max_length=5))]

def four_op_df(spark, gen, length=2048, seed=0):
def four_op_df(spark, gen, length=2048):
return gen_df(spark, StructGen([
('a', gen),
('b', gen),
('c', gen),
('d', gen)], nullable=False), length=length, seed=seed)
('d', gen)], nullable=False), length=length)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/map_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
from data_gen import *
from conftest import is_databricks_runtime
from marks import allow_non_gpu, ignore_order
from marks import allow_non_gpu, ignore_order, datagen_overrides
from spark_session import is_before_spark_330, is_databricks104_or_later, is_databricks113_or_later, is_spark_33X, is_spark_340_or_later
from pyspark.sql.functions import create_map, col, lit, row_number
from pyspark.sql.types import *
Expand Down Expand Up @@ -186,6 +186,7 @@ def query_map_scalar(spark):


@allow_non_gpu('WindowLocalExec')
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9683')
@pytest.mark.parametrize('data_gen', supported_key_map_gens, ids=idfn)
def test_map_scalars_supported_key_types(data_gen):
key_gen = data_gen._key_gen
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
delta_lake = pytest.mark.delta_lake
large_data_test = pytest.mark.large_data_test
pyarrow_test = pytest.mark.pyarrow_test
datagen_overrides = pytest.mark.datagen_overrides
1 change: 1 addition & 0 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ def generate_map_with_empty_validity(spark, path):
lambda spark, path: spark.read.parquet(path),
data_path)

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9701')
@pytest.mark.parametrize('data_gen', parquet_nested_datetime_gen, ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase_write', ['CORRECTED', 'LEGACY'])
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def test_window_aggs_for_range_numeric_date(data_gen, batch_size):
# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
@datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9682")
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls,
_grpkey_longs_with_nulls,
Expand Down