Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a random seed specific to datagen cases #9441

Merged
merged 17 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from marks import ignore_order, incompat, approximate_float, allow_non_gpu
from marks import ignore_order, incompat, approximate_float, allow_non_gpu, datagen_overrides
from pyspark.sql.types import *
from pyspark.sql.types import IntegralType
from spark_session import *
Expand Down Expand Up @@ -677,6 +677,7 @@ def test_shift_right_unsigned(data_gen):

@incompat
@approximate_float
@datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/9350")
@pytest.mark.parametrize('data_gen', _arith_data_gens_for_round, ids=idfn)
def test_decimal_bround(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
7 changes: 4 additions & 3 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from join_test import create_df
from marks import incompat, allow_non_gpu, ignore_order
from marks import incompat, allow_non_gpu, ignore_order, datagen_overrides
import pyspark.mllib.linalg as mllib
import pyspark.ml.linalg as ml

Expand Down Expand Up @@ -91,11 +91,12 @@ def do_join(spark):
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@ignore_order
@datagen_overrides(seed=0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no reason here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh whoops, let me add that.. not intended

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am re-running CI with these removed. I can't repro these locally.

def test_cache_expand_exec(data_gen, enable_vectorized_conf):
def op_df(spark, length=2048, seed=0):
def op_df(spark, length=2048):
cached = gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).cache()
('b', IntegerGen())], nullable=False), length=length).cache()
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.col("b"))

Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from spark_session import is_before_spark_320, is_before_spark_330, is_spark_340_or_later, \
is_databricks113_or_later
from marks import allow_non_gpu, approximate_float
from marks import allow_non_gpu, approximate_float, datagen_overrides
from pyspark.sql.types import *
from spark_init_internal import spark_version
from datetime import date, datetime
Expand Down Expand Up @@ -146,6 +146,7 @@ def test_cast_string_date_non_ansi():
lambda spark: spark.createDataFrame(data_rows, "a string").select(f.col('a').cast(DateType())),
conf={'spark.rapids.sql.hasExtendedYearValues': 'false'})

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9708')
@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}'),
StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9]'),
StringGen('[0-9]{1,4}-[0-3][0-9]-[0-5][0-9][ |T][0-3][0-9]:[0-6][0-9]:[0-6][0-9].[0-9]{0,6}Z?')],
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from data_gen import *
from spark_session import with_cpu_session, is_before_spark_330
from pyspark.sql.types import *
from marks import datagen_overrides
import pyspark.sql.functions as f

@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + struct_gens_sample_with_decimal128_no_list, ids=idfn)
Expand Down Expand Up @@ -329,6 +330,7 @@ def test_in(data_gen):

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries over that value.
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9687')
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in_set(data_gen):
# nulls are not supported for in on the GPU yet
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pyspark.sql.functions as f
import pyspark.sql.utils
from spark_session import with_cpu_session, with_gpu_session
from conftest import get_datagen_seed

nested_gens = [ArrayGen(LongGen()), ArrayGen(decimal_gen_128bit),
StructGen([("a", LongGen()), ("b", decimal_gen_128bit)]),
Expand Down Expand Up @@ -258,8 +259,11 @@ def test_sequence_without_step(start_gen, stop_gen):

@pytest.mark.parametrize('start_gen,stop_gen,step_gen', sequence_normal_integral_gens, ids=idfn)
def test_sequence_with_step(start_gen, stop_gen, step_gen):
# Get the datagen seed we use for all datagens, since we need to call start
# on step_gen
data_gen_seed = get_datagen_seed()
# Get a step scalar from the 'step_gen' which follows the rules.
step_gen.start(random.Random(0))
step_gen.start(random.Random(data_gen_seed))
step_lit = step_gen.gen()
assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, start_gen, stop_gen, step_gen).selectExpr(
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from data_gen import *
from spark_session import is_before_spark_320, is_jvm_charset_utf8
from pyspark.sql.types import *
from marks import datagen_overrides
import pyspark.sql.functions as f

def mk_str_gen(pattern):
Expand Down Expand Up @@ -68,6 +69,7 @@ def test_if_else_map(data_gen):
'IF(TRUE, b, c)',
'IF(a, b, c)'))

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9685')
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens, ids=idfn)
def test_case_when(data_gen):
Expand Down Expand Up @@ -130,6 +132,7 @@ def test_nvl(data_gen):
# in both cpu and gpu runs.
# E: java.lang.AssertionError: assertion failed: each serializer expression should contain\
# at least one `BoundReference`
@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9684')
@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens_nonempty_struct + map_gens_sample, ids=idfn)
def test_coalesce(data_gen):
num_cols = 20
Expand All @@ -140,7 +143,6 @@ def test_coalesce(data_gen):
for x in range(0, num_cols)], nullable=False)
command_args = [f.col('_c' + str(x)) for x in range(0, num_cols)]
command_args.append(s1)
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen).select(
f.coalesce(*command_args)))
Expand Down
31 changes: 24 additions & 7 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ def is_parquet_testing_tests_forced():
def should_inject_oom():
return _inject_oom != None

# For datagen: we expect a seed to be provided by the environment, or default to 0.
# Note that tests can override their seed when calling into datagen by setting seed= in their tests.
_test_datagen_random_seed = int(os.getenv("SPARK_RAPIDS_TEST_DATAGEN_SEED", 0))
print(f"Starting with datagen test seed: {_test_datagen_random_seed}. "
"Set env variable SPARK_RAPIDS_TEST_DATAGEN_SEED to override.")

def get_datagen_seed():
return _test_datagen_random_seed

def get_limit():
return _limit

Expand All @@ -132,7 +141,12 @@ def pytest_runtest_setup(item):
global _sort_on_spark
global _sort_locally
global _inject_oom
global _test_datagen_random_seed
_inject_oom = item.get_closest_marker('inject_oom')
datagen_overrides = item.get_closest_marker('datagen_overrides')
if datagen_overrides:
_test_datagen_random_seed = datagen_overrides.kwargs.get('seed', _test_datagen_random_seed)

order = item.get_closest_marker('ignore_order')
if order:
if order.kwargs.get('local', False):
Expand Down Expand Up @@ -251,12 +265,6 @@ def pytest_configure(config):
print(f"Starting with OOM injection seed: {oom_random_injection_seed}. "
"Set env variable SPARK_RAPIDS_TEST_INJECT_OOM_SEED to override.")

# For datagen: we expect a seed to be provided by the environment, or default to 0.
# Note that tests can override their seed when calling into datagen by setting seed= in their tests.
test_datagen_random_seed = int(os.getenv("SPARK_RAPIDS_TEST_DATAGEN_SEED", 0))
print(f"Starting with datagen test seed: {test_datagen_random_seed}. "
"Set env variable SPARK_RAPIDS_TEST_DATAGEN_SEED to override.")

def pytest_collection_modifyitems(config, items):
r = random.Random(oom_random_injection_seed)
for item in items:
Expand All @@ -265,7 +273,16 @@ def pytest_collection_modifyitems(config, items):
# decide if OOMs should be injected, and when
injection_mode = config.getoption('test_oom_injection_mode').lower()
inject_choice = False
extras.append('DATAGEN_SEED=%s' % str(test_datagen_random_seed))
datagen_overrides = item.get_closest_marker('datagen_overrides')
if datagen_overrides:
test_datagen_random_seed_choice = datagen_overrides.kwargs.get('seed', _test_datagen_random_seed)
if test_datagen_random_seed_choice != _test_datagen_random_seed:
extras.append('DATAGEN_SEED_OVERRIDE=%s' % str(test_datagen_random_seed_choice))
else:
extras.append('DATAGEN_SEED=%s' % str(test_datagen_random_seed_choice))
else:
extras.append('DATAGEN_SEED=%s' % str(_test_datagen_random_seed))

if injection_mode == 'random':
inject_choice = r.randrange(0, 2) == 1
elif injection_mode == 'always':
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list,
"'T'HH:mm[:ss]",
"'T'HH:mm"]

@datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/9701')
@pytest.mark.parametrize('ts_part', csv_supported_ts_parts)
@pytest.mark.parametrize('date_format', csv_supported_date_formats)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
Expand Down
42 changes: 26 additions & 16 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from spark_session import is_tz_utc, is_before_spark_340, with_cpu_session
import sre_yield
import struct
from conftest import skip_unless_precommit_tests,test_datagen_random_seed
from conftest import skip_unless_precommit_tests,get_datagen_seed
import time
import os
from functools import lru_cache
Expand Down Expand Up @@ -756,14 +756,19 @@ def skip_if_not_utc():
# Note: Current(2023/06/06) maxmium IT data size is 7282688 bytes, so LRU cache with maxsize 128
# will lead to 7282688 * 128 = 932 MB additional memory usage in edge case, which is acceptable.
@lru_cache(maxsize=128, typed=True)
def gen_df_help(data_gen, length, seed):
rand = random.Random(seed)
def gen_df_help(data_gen, length, seed_value):
rand = random.Random(seed_value)
data_gen.start(rand)
data = [data_gen.gen() for index in range(0, length)]
return data

def gen_df(spark, data_gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
def gen_df(spark, data_gen, length=2048, seed=None, num_slices=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a follow on issue to remove seed for gen_df and force us to go through the annotation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Generate a spark dataframe from the given data generators."""
if seed is None:
seed_value = get_datagen_seed()
else:
seed_value = seed

if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
else:
Expand All @@ -775,7 +780,7 @@ def gen_df(spark, data_gen, length=2048, seed=test_datagen_random_seed, num_slic
if src.contains_ts():
skip_if_not_utc()

data = gen_df_help(src, length, seed)
data = gen_df_help(src, length, seed_value)

# We use `numSlices` to create an RDD with the specific number of partitions,
# which is then turned into a dataframe. If not specified, it is `None` (default spark value)
Expand Down Expand Up @@ -816,39 +821,44 @@ def _mark_as_lit(data, data_type):
# lit does not take a data type so we might have to cast it
return f.lit(data).cast(data_type)

def _gen_scalars_common(data_gen, count, seed=test_datagen_random_seed):
def _gen_scalars_common(data_gen, count, seed=None):
if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
else:
src = data_gen

if seed is None:
seed_value = get_datagen_seed()
else:
seed_value = seed

# Before we get too far we need to verify that we can run with timestamps
if src.contains_ts():
skip_if_not_utc()

rand = random.Random(seed)
rand = random.Random(seed_value)
src.start(rand)
return src

def gen_scalars(data_gen, count, seed=test_datagen_random_seed, force_no_nulls=False):
def gen_scalars(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values."""
if force_no_nulls:
assert(not isinstance(data_gen, NullGen))
src = _gen_scalars_common(data_gen, count, seed=seed)
data_type = src.data_type
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count))

def gen_scalar(data_gen, seed=test_datagen_random_seed, force_no_nulls=False):
def gen_scalar(data_gen, seed=None, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalars(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]

def gen_scalar_values(data_gen, count, seed=test_datagen_random_seed, force_no_nulls=False):
def gen_scalar_values(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values."""
src = _gen_scalars_common(data_gen, count, seed=seed)
return (src.gen(force_no_nulls=force_no_nulls) for i in range(0, count))

def gen_scalar_value(data_gen, seed=test_datagen_random_seed, force_no_nulls=False):
def gen_scalar_value(data_gen, seed=None, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]
Expand Down Expand Up @@ -890,18 +900,18 @@ def tmp(something):
return meta + idfn(something)
return tmp

def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=None, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen),('c', c_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def two_col_df(spark, a_gen, b_gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
def two_col_df(spark, a_gen, b_gen, length=2048, seed=None, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def binary_op_df(spark, gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
def binary_op_df(spark, gen, length=2048, seed=None, num_slices=None):
return two_col_df(spark, gen, gen, length=length, seed=seed, num_slices=num_slices)

def unary_op_df(spark, gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
def unary_op_df(spark, gen, length=2048, seed=None, num_slices=None):
return gen_df(spark, StructGen([('a', gen)], nullable=False),
length=length, seed=seed, num_slices=num_slices)

Expand Down Expand Up @@ -974,7 +984,7 @@ def _convert_to_sql(spark_type, data):
else:
return 'CAST({} as {})'.format(d, to_cast_string(spark_type))

def gen_scalars_for_sql(data_gen, count, seed=test_datagen_random_seed, force_no_nulls=False):
def gen_scalars_for_sql(data_gen, count, seed=None, force_no_nulls=False):
"""Generate scalar values, but strings that can be used in selectExpr or SQL"""
src = _gen_scalars_common(data_gen, count, seed=seed)
if isinstance(data_gen, NullGen):
Expand Down
7 changes: 4 additions & 3 deletions integration_tests/src/main/python/expand_exec_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal
from data_gen import *
import pyspark.sql.functions as f
from marks import ignore_order
from marks import ignore_order, datagen_overrides

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
# Many Spark versions have issues sorting large decimals,
# see https://issues.apache.org/jira/browse/SPARK-40089.
@datagen_overrides(seed=0)
@ignore_order(local=True)
def test_expand_exec(data_gen):
def op_df(spark, length=2048, seed=0):
def op_df(spark, length=2048):
return gen_df(spark, StructGen([
('a', data_gen),
('b', IntegerGen())], nullable=False), length=length, seed=seed).rollup(f.col("a"), f.col("b")).agg(f.col("b"))
('b', IntegerGen())], nullable=False), length=length).rollup(f.col("a"), f.col("b")).agg(f.col("b"))

assert_gpu_and_cpu_are_equal_collect(op_df)
10 changes: 7 additions & 3 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from data_gen import *
from marks import allow_non_gpu, ignore_order
from marks import allow_non_gpu, ignore_order, datagen_overrides
from pyspark.sql.types import *
import pyspark.sql.functions as f

Expand All @@ -26,16 +26,17 @@
arrays_with_binary = [ArrayGen(BinaryGen(max_length=5))]
maps_with_binary = [MapGen(IntegerGen(nullable=False), BinaryGen(max_length=5))]

def four_op_df(spark, gen, length=2048, seed=0):
def four_op_df(spark, gen, length=2048):
return gen_df(spark, StructGen([
('a', gen),
('b', gen),
('c', gen),
('d', gen)], nullable=False), length=length, seed=seed)
('d', gen)], nullable=False), length=length)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@datagen_overrides(seed=0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the reason for these?

@pytest.mark.parametrize('data_gen', explode_gens, ids=idfn)
def test_explode_makearray(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -44,6 +45,7 @@ def test_explode_makearray(data_gen):
#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@datagen_overrides(seed=0)
@pytest.mark.parametrize('data_gen', explode_gens, ids=idfn)
def test_explode_litarray(data_gen):
array_lit = with_cpu_session(
Expand Down Expand Up @@ -124,6 +126,7 @@ def test_explode_outer_nested_array_data(data_gen):
#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@datagen_overrides(seed=0)
@pytest.mark.parametrize('data_gen', explode_gens, ids=idfn)
def test_posexplode_makearray(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -132,6 +135,7 @@ def test_posexplode_makearray(data_gen):
#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@datagen_overrides(seed=0)
@pytest.mark.parametrize('data_gen', explode_gens, ids=idfn)
def test_posexplode_litarray(data_gen):
array_lit = with_cpu_session(
Expand Down
Loading
Loading