Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tests and documentation for spark.sql.timestampType when reading CSV/JSON #9159

Merged
merged 11 commits into from
Sep 28, 2023
9 changes: 6 additions & 3 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ Only a limited set of formats are supported when parsing dates.

### CSV Timestamps
The CSV parser does not support time zones. It will ignore any trailing time zone information,
despite the format asking for a `XXX` or `[XXX]`. As such it is off by default and you can enable it
by setting [`spark.rapids.sql.csvTimestamps.enabled`](configs.md#sql.csvTimestamps.enabled) to
`true`.
despite the format asking for a `XXX` or `[XXX]`. The CSV parser does not support the `TimestampNTZ`
type and will fall back to CPU if `spark.sql.timestampType` is set to `TIMESTAMP_NTZ` or if an
explicit schema is provided that contains the `TimestampNTZ` type.

The formats supported for timestamps are limited similar to dates. The first part of the format
must be a supported date format. The second part must start with a `'T'` to separate the time
Expand Down Expand Up @@ -350,6 +350,9 @@ Spark will treat them as invalid inputs and will just return `null`.

### JSON Timestamps

The JSON parser does not support the `TimestampNTZ` type and will fall back to CPU if `spark.sql.timestampType` is
set to `TIMESTAMP_NTZ` or if an explicit schema is provided that contains the `TimestampNTZ` type.

There is currently no support for reading numeric values as timestamps and null values are returned instead
([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast
to timestamp.
Expand Down
52 changes: 52 additions & 0 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,58 @@ def test_csv_read_count(spark_tmp_path):

assert_gpu_and_cpu_row_counts_equal(lambda spark: spark.read.csv(data_path))

@allow_non_gpu('FileSourceScanExec', 'ProjectExec', 'CollectLimitExec', 'DeserializeToObjectExec')
@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+')
@pytest.mark.parametrize('date_format', csv_supported_date_formats)
@pytest.mark.parametrize('ts_part', csv_supported_ts_parts)
@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"])
def test_csv_infer_schema_timestamp_ntz_v1(spark_tmp_path, date_format, ts_part, timestamp_type):
csv_infer_schema_timestamp_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, 'csv', 'FileSourceScanExec')

@allow_non_gpu('BatchScanExec', 'FileSourceScanExec', 'ProjectExec', 'CollectLimitExec', 'DeserializeToObjectExec')
@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+')
@pytest.mark.parametrize('date_format', csv_supported_date_formats)
@pytest.mark.parametrize('ts_part', csv_supported_ts_parts)
@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"])
def test_csv_infer_schema_timestamp_ntz_v2(spark_tmp_path, date_format, ts_part, timestamp_type):
csv_infer_schema_timestamp_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, '', 'BatchScanExec')

def csv_infer_schema_timestamp_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, v1_enabled_list, cpu_scan_class):
full_format = date_format + ts_part
# specify to use no timezone rather than defaulting to UTC
data_gen = TimestampGen(tzinfo=None)
gen = StructGen([('a', data_gen)], nullable=False)
data_path = spark_tmp_path + '/CSV_DATA'
with_cpu_session(
lambda spark : gen_df(spark, gen).write
.option('timestampFormat', full_format)
.csv(data_path))

def do_read(spark):
return spark.read.option("inferSchema", "true") \
.option('timestampFormat', full_format) \
.csv(data_path)

conf = { 'spark.sql.timestampType': timestamp_type,
'spark.sql.sources.useV1SourceList': v1_enabled_list }

# determine whether Spark CPU infers TimestampType or TimestampNtzType
inferred_type = with_cpu_session(
lambda spark : do_read(spark).schema["_c0"].dataType.typeName(), conf=conf)

if inferred_type == "timestamp_ntz":
# we fall back to CPU due to "unsupported data types in output: TimestampNTZType"
assert_gpu_fallback_collect(
lambda spark: do_read(spark),
cpu_fallback_class_name = cpu_scan_class,
conf = conf)
else:
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: do_read(spark),
exist_classes = 'Gpu' + cpu_scan_class,
non_exist_classes = cpu_scan_class,
conf = conf)

@allow_non_gpu('FileSourceScanExec', 'CollectLimitExec', 'DeserializeToObjectExec')
@pytest.mark.skipif(is_before_spark_340(), reason='`preferDate` is only supported in Spark 340+')
def test_csv_prefer_date_with_infer_schema(spark_tmp_path):
Expand Down
10 changes: 5 additions & 5 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,27 +572,28 @@ def start(self, rand):

class TimestampGen(DataGen):
"""Generate Timestamps in a given range. All timezones are UTC by default."""
def __init__(self, start=None, end=None, nullable=True):
super().__init__(TimestampType(), nullable=nullable)
def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc):
super().__init__(TimestampNTZType() if tzinfo==None else TimestampType(), nullable=nullable)
if start is None:
# Spark supports times starting at
# "0001-01-01 00:00:00.000000"
# but it has issues if you get really close to that because it tries to do things
# in a different format which causes roundoff, so we have to add a few days,
# just to be sure
start = datetime(1, 1, 3, tzinfo=timezone.utc)
start = datetime(1, 1, 3, tzinfo=tzinfo)
elif not isinstance(start, datetime):
raise RuntimeError('Unsupported type passed in for start {}'.format(start))

if end is None:
# Spark supports time through
# "9999-12-31 23:59:59.999999"
end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc)
end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=tzinfo)
elif isinstance(end, timedelta):
end = start + end
elif not isinstance(start, date):
raise RuntimeError('Unsupported type passed in for end {}'.format(end))

self._epoch = datetime(1970, 1, 1, tzinfo=tzinfo)
self._start_time = self._to_us_since_epoch(start)
self._end_time = self._to_us_since_epoch(end)
if (self._epoch >= start and self._epoch <= end):
Expand All @@ -601,7 +602,6 @@ def __init__(self, start=None, end=None, nullable=True):
def _cache_repr(self):
return super()._cache_repr() + '(' + str(self._start_time) + ',' + str(self._end_time) + ')'

_epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
_us = timedelta(microseconds=1)

def _to_us_since_epoch(self, val):
Expand Down
56 changes: 55 additions & 1 deletion integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import pyspark.sql.functions as f
import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_and_cpu_row_counts_equal, assert_gpu_fallback_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_and_cpu_row_counts_equal, \
assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
from data_gen import *
from datetime import timezone
from conftest import is_databricks_runtime
from marks import approximate_float, allow_non_gpu, ignore_order
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
Expand Down Expand Up @@ -200,6 +202,58 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
.json(data_path),
conf=updated_conf)

@allow_non_gpu('FileSourceScanExec', 'ProjectExec')
@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+')
@pytest.mark.parametrize('ts_part', json_supported_ts_parts)
@pytest.mark.parametrize('date_format', json_supported_date_formats)
@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"])
def test_json_ts_formats_round_trip_ntz_v1(spark_tmp_path, date_format, ts_part, timestamp_type):
json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, 'json', 'FileSourceScanExec')

@allow_non_gpu('BatchScanExec', 'ProjectExec')
@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+')
@pytest.mark.parametrize('ts_part', json_supported_ts_parts)
@pytest.mark.parametrize('date_format', json_supported_date_formats)
@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"])
def test_json_ts_formats_round_trip_ntz_v2(spark_tmp_path, date_format, ts_part, timestamp_type):
json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, '', 'BatchScanExec')

def json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, v1_enabled_list, cpu_scan_class):
full_format = date_format + ts_part
data_gen = TimestampGen(tzinfo=None if timestamp_type == "TIMESTAMP_NTZ" else timezone.utc)
gen = StructGen([('a', data_gen)], nullable=False)
data_path = spark_tmp_path + '/JSON_DATA'
schema = gen.data_type
with_cpu_session(
lambda spark : gen_df(spark, gen).write \
.option('timestampFormat', full_format) \
.json(data_path))
updated_conf = copy_and_update(_enable_all_types_conf,
{
'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.timestampType': timestamp_type
})

def do_read(spark):
return spark.read \
.schema(schema) \
.option('timestampFormat', full_format) \
.json(data_path)


if timestamp_type == "TIMESTAMP_LTZ":
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark : do_read(spark),
exist_classes = 'Gpu' + cpu_scan_class,
non_exist_classes = cpu_scan_class,
conf=updated_conf)
else:
# we fall back to CPU due to "unsupported data types in output: TimestampNTZType"
assert_gpu_fallback_collect(
lambda spark : do_read(spark),
cpu_fallback_class_name = cpu_scan_class,
conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'boolean.json',
Expand Down