Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a random seed specific to datagen cases #9441

Merged
merged 17 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ EOF
fi
fi

# Set a seed to be used in the tests, for datagen
export SPARK_RAPIDS_TEST_DATAGEN_SEED=${SPARK_RAPIDS_TEST_DATAGEN_SEED:-`date +%s`}
echo "SPARK_RAPIDS_TEST_DATAGEN_SEED used: $SPARK_RAPIDS_TEST_DATAGEN_SEED"

# Set a seed to be used to pick random tests to inject with OOM
export SPARK_RAPIDS_TEST_INJECT_OOM_SEED=${SPARK_RAPIDS_TEST_INJECT_OOM_SEED:-`date +%s`}
echo "SPARK_RAPIDS_TEST_INJECT_OOM_SEED used: $SPARK_RAPIDS_TEST_INJECT_OOM_SEED"
Expand Down
8 changes: 7 additions & 1 deletion integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def is_parquet_testing_tests_forced():
_inject_oom = None

def should_inject_oom():
global _inject_oom
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the global not needed any more?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I owed @gerashegalov from a long time ago #7925 (comment).

Since we don't need to change the variable, it's not needed.

return _inject_oom != None

def get_limit():
Expand Down Expand Up @@ -248,6 +247,12 @@ def pytest_configure(config):
print(f"Starting with OOM injection seed: {oom_random_injection_seed}. "
"Set env variable SPARK_RAPIDS_TEST_INJECT_OOM_SEED to override.")

# For datagen: we expect a seed to be provided by the environment, or default to 0.
# Note that tests can override their seed when calling into datagen by setting seed= in their tests.
test_datagen_random_seed = int(os.getenv("SPARK_RAPIDS_TEST_DATAGEN_SEED", 0))
print(f"Starting with datagen test seed: {test_datagen_random_seed}. "
"Set env variable SPARK_RAPIDS_TEST_DATAGEN_SEED to override.")

def pytest_collection_modifyitems(config, items):
r = random.Random(oom_random_injection_seed)
for item in items:
Expand All @@ -256,6 +261,7 @@ def pytest_collection_modifyitems(config, items):
# decide if OOMs should be injected, and when
injection_mode = config.getoption('test_oom_injection_mode').lower()
inject_choice = False
extras.append('DATAGEN_SEED=%s' % str(test_datagen_random_seed))
if injection_mode == 'random':
inject_choice = r.randrange(0, 2) == 1
elif injection_mode == 'always':
Expand Down
24 changes: 12 additions & 12 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from spark_session import is_tz_utc, is_before_spark_340
import sre_yield
import struct
from conftest import skip_unless_precommit_tests
from conftest import skip_unless_precommit_tests,test_datagen_random_seed
import time
import os
from functools import lru_cache
Expand Down Expand Up @@ -761,7 +761,7 @@ def gen_df_help(data_gen, length, seed):
data = [data_gen.gen() for index in range(0, length)]
return data

def gen_df(spark, data_gen, length=2048, seed=0, num_slices=None):
def gen_df(spark, data_gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
"""Generate a spark dataframe from the given data generators."""
if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
Expand Down Expand Up @@ -815,7 +815,7 @@ def _mark_as_lit(data, data_type):
# lit does not take a data type so we might have to cast it
return f.lit(data).cast(data_type)

def _gen_scalars_common(data_gen, count, seed=0):
def _gen_scalars_common(data_gen, count, seed=test_datagen_random_seed):
if isinstance(data_gen, list):
src = StructGen(data_gen, nullable=False)
else:
Expand All @@ -829,25 +829,25 @@ def _gen_scalars_common(data_gen, count, seed=0):
src.start(rand)
return src

def gen_scalars(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalars(data_gen, count, seed=test_datagen_random_seed, force_no_nulls=False):
"""Generate scalar values."""
if force_no_nulls:
assert(not isinstance(data_gen, NullGen))
src = _gen_scalars_common(data_gen, count, seed=seed)
data_type = src.data_type
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count))

def gen_scalar(data_gen, seed=0, force_no_nulls=False):
def gen_scalar(data_gen, seed=test_datagen_random_seed, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalars(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]

def gen_scalar_values(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalar_values(data_gen, count, seed=test_datagen_random_seed, force_no_nulls=False):
"""Generate scalar values."""
src = _gen_scalars_common(data_gen, count, seed=seed)
return (src.gen(force_no_nulls=force_no_nulls) for i in range(0, count))

def gen_scalar_value(data_gen, seed=0, force_no_nulls=False):
def gen_scalar_value(data_gen, seed=test_datagen_random_seed, force_no_nulls=False):
"""Generate a single scalar value."""
v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls))
return v[0]
Expand Down Expand Up @@ -889,18 +889,18 @@ def tmp(something):
return meta + idfn(something)
return tmp

def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=0, num_slices=None):
def three_col_df(spark, a_gen, b_gen, c_gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen),('c', c_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def two_col_df(spark, a_gen, b_gen, length=2048, seed=0, num_slices=None):
def two_col_df(spark, a_gen, b_gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
gen = StructGen([('a', a_gen),('b', b_gen)], nullable=False)
return gen_df(spark, gen, length=length, seed=seed, num_slices=num_slices)

def binary_op_df(spark, gen, length=2048, seed=0, num_slices=None):
def binary_op_df(spark, gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
return two_col_df(spark, gen, gen, length=length, seed=seed, num_slices=num_slices)

def unary_op_df(spark, gen, length=2048, seed=0, num_slices=None):
def unary_op_df(spark, gen, length=2048, seed=test_datagen_random_seed, num_slices=None):
return gen_df(spark, StructGen([('a', gen)], nullable=False),
length=length, seed=seed, num_slices=num_slices)

Expand Down Expand Up @@ -973,7 +973,7 @@ def _convert_to_sql(spark_type, data):
else:
return 'CAST({} as {})'.format(d, to_cast_string(spark_type))

def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
def gen_scalars_for_sql(data_gen, count, seed=test_datagen_random_seed, force_no_nulls=False):
"""Generate scalar values, but strings that can be used in selectExpr or SQL"""
src = _gen_scalars_common(data_gen, count, seed=seed)
if isinstance(data_gen, NullGen):
Expand Down