diff --git a/docs/compatibility.md b/docs/compatibility.md index b5cb01757dd..01f9707e17a 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -123,9 +123,9 @@ Only a limited set of formats are supported when parsing dates. ### CSV Timestamps The CSV parser does not support time zones. It will ignore any trailing time zone information, -despite the format asking for a `XXX` or `[XXX]`. As such it is off by default and you can enable it -by setting [`spark.rapids.sql.csvTimestamps.enabled`](configs.md#sql.csvTimestamps.enabled) to -`true`. +despite the format asking for a `XXX` or `[XXX]`. The CSV parser does not support the `TimestampNTZ` +type and will fall back to CPU if `spark.sql.timestampType` is set to `TIMESTAMP_NTZ` or if an +explicit schema is provided that contains the `TimestampNTZ` type. The formats supported for timestamps are limited similar to dates. The first part of the format must be a supported date format. The second part must start with a `'T'` to separate the time @@ -350,6 +350,9 @@ Spark will treat them as invalid inputs and will just return `null`. ### JSON Timestamps +The JSON parser does not support the `TimestampNTZ` type and will fall back to CPU if `spark.sql.timestampType` is +set to `TIMESTAMP_NTZ` or if an explicit schema is provided that contains the `TimestampNTZ` type. + There is currently no support for reading numeric values as timestamps and null values are returned instead ([#4940](https://github.com/NVIDIA/spark-rapids/issues/4940)). A workaround would be to read as longs and then cast to timestamp. diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index 5227dd0a41c..bae076f5a4d 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -551,6 +551,58 @@ def test_csv_read_count(spark_tmp_path): assert_gpu_and_cpu_row_counts_equal(lambda spark: spark.read.csv(data_path)) +@allow_non_gpu('FileSourceScanExec', 'ProjectExec', 'CollectLimitExec', 'DeserializeToObjectExec') +@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+') +@pytest.mark.parametrize('date_format', csv_supported_date_formats) +@pytest.mark.parametrize('ts_part', csv_supported_ts_parts) +@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"]) +def test_csv_infer_schema_timestamp_ntz_v1(spark_tmp_path, date_format, ts_part, timestamp_type): + csv_infer_schema_timestamp_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, 'csv', 'FileSourceScanExec') + +@allow_non_gpu('BatchScanExec', 'FileSourceScanExec', 'ProjectExec', 'CollectLimitExec', 'DeserializeToObjectExec') +@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+') +@pytest.mark.parametrize('date_format', csv_supported_date_formats) +@pytest.mark.parametrize('ts_part', csv_supported_ts_parts) +@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"]) +def test_csv_infer_schema_timestamp_ntz_v2(spark_tmp_path, date_format, ts_part, timestamp_type): + csv_infer_schema_timestamp_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, '', 'BatchScanExec') + +def csv_infer_schema_timestamp_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, v1_enabled_list, cpu_scan_class): + full_format = date_format + ts_part + # specify to use no timezone rather than defaulting to UTC + data_gen = TimestampGen(tzinfo=None) + gen = StructGen([('a', data_gen)], nullable=False) + data_path = spark_tmp_path + '/CSV_DATA' + with_cpu_session( + lambda spark : gen_df(spark, gen).write + .option('timestampFormat', full_format) + .csv(data_path)) + + def do_read(spark): + return spark.read.option("inferSchema", "true") \ + .option('timestampFormat', full_format) \ + .csv(data_path) + + conf = { 'spark.sql.timestampType': timestamp_type, + 'spark.sql.sources.useV1SourceList': v1_enabled_list } + + # determine whether Spark CPU infers TimestampType or TimestampNtzType + inferred_type = with_cpu_session( + lambda spark : do_read(spark).schema["_c0"].dataType.typeName(), conf=conf) + + if inferred_type == "timestamp_ntz": + # we fall back to CPU due to "unsupported data types in output: TimestampNTZType" + assert_gpu_fallback_collect( + lambda spark: do_read(spark), + cpu_fallback_class_name = cpu_scan_class, + conf = conf) + else: + assert_cpu_and_gpu_are_equal_collect_with_capture( + lambda spark: do_read(spark), + exist_classes = 'Gpu' + cpu_scan_class, + non_exist_classes = cpu_scan_class, + conf = conf) + @allow_non_gpu('FileSourceScanExec', 'CollectLimitExec', 'DeserializeToObjectExec') @pytest.mark.skipif(is_before_spark_340(), reason='`preferDate` is only supported in Spark 340+') def test_csv_prefer_date_with_infer_schema(spark_tmp_path): diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index ab9890b0712..9f549adfa46 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -572,27 +572,28 @@ def start(self, rand): class TimestampGen(DataGen): """Generate Timestamps in a given range. All timezones are UTC by default.""" - def __init__(self, start=None, end=None, nullable=True): - super().__init__(TimestampType(), nullable=nullable) + def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc): + super().__init__(TimestampNTZType() if tzinfo==None else TimestampType(), nullable=nullable) if start is None: # Spark supports times starting at # "0001-01-01 00:00:00.000000" # but it has issues if you get really close to that because it tries to do things # in a different format which causes roundoff, so we have to add a few days, # just to be sure - start = datetime(1, 1, 3, tzinfo=timezone.utc) + start = datetime(1, 1, 3, tzinfo=tzinfo) elif not isinstance(start, datetime): raise RuntimeError('Unsupported type passed in for start {}'.format(start)) if end is None: # Spark supports time through # "9999-12-31 23:59:59.999999" - end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) + end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=tzinfo) elif isinstance(end, timedelta): end = start + end elif not isinstance(start, date): raise RuntimeError('Unsupported type passed in for end {}'.format(end)) + self._epoch = datetime(1970, 1, 1, tzinfo=tzinfo) self._start_time = self._to_us_since_epoch(start) self._end_time = self._to_us_since_epoch(end) if (self._epoch >= start and self._epoch <= end): @@ -601,7 +602,6 @@ def __init__(self, start=None, end=None, nullable=True): def _cache_repr(self): return super()._cache_repr() + '(' + str(self._start_time) + ',' + str(self._end_time) + ')' - _epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) _us = timedelta(microseconds=1) def _to_us_since_epoch(self, val): diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 7c8b2499130..181e8f11bd2 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -15,8 +15,10 @@ import pyspark.sql.functions as f import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_and_cpu_row_counts_equal, assert_gpu_fallback_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_and_cpu_row_counts_equal, \ + assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture from data_gen import * +from datetime import timezone from conftest import is_databricks_runtime from marks import approximate_float, allow_non_gpu, ignore_order from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330 @@ -200,6 +202,58 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena .json(data_path), conf=updated_conf) +@allow_non_gpu('FileSourceScanExec', 'ProjectExec') +@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+') +@pytest.mark.parametrize('ts_part', json_supported_ts_parts) +@pytest.mark.parametrize('date_format', json_supported_date_formats) +@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"]) +def test_json_ts_formats_round_trip_ntz_v1(spark_tmp_path, date_format, ts_part, timestamp_type): + json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, 'json', 'FileSourceScanExec') + +@allow_non_gpu('BatchScanExec', 'ProjectExec') +@pytest.mark.skipif(is_before_spark_340(), reason='`TIMESTAMP_NTZ` is only supported in Spark 340+') +@pytest.mark.parametrize('ts_part', json_supported_ts_parts) +@pytest.mark.parametrize('date_format', json_supported_date_formats) +@pytest.mark.parametrize("timestamp_type", ["TIMESTAMP_LTZ", "TIMESTAMP_NTZ"]) +def test_json_ts_formats_round_trip_ntz_v2(spark_tmp_path, date_format, ts_part, timestamp_type): + json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, '', 'BatchScanExec') + +def json_ts_formats_round_trip_ntz(spark_tmp_path, date_format, ts_part, timestamp_type, v1_enabled_list, cpu_scan_class): + full_format = date_format + ts_part + data_gen = TimestampGen(tzinfo=None if timestamp_type == "TIMESTAMP_NTZ" else timezone.utc) + gen = StructGen([('a', data_gen)], nullable=False) + data_path = spark_tmp_path + '/JSON_DATA' + schema = gen.data_type + with_cpu_session( + lambda spark : gen_df(spark, gen).write \ + .option('timestampFormat', full_format) \ + .json(data_path)) + updated_conf = copy_and_update(_enable_all_types_conf, + { + 'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.timestampType': timestamp_type + }) + + def do_read(spark): + return spark.read \ + .schema(schema) \ + .option('timestampFormat', full_format) \ + .json(data_path) + + + if timestamp_type == "TIMESTAMP_LTZ": + assert_cpu_and_gpu_are_equal_collect_with_capture( + lambda spark : do_read(spark), + exist_classes = 'Gpu' + cpu_scan_class, + non_exist_classes = cpu_scan_class, + conf=updated_conf) + else: + # we fall back to CPU due to "unsupported data types in output: TimestampNTZType" + assert_gpu_fallback_collect( + lambda spark : do_read(spark), + cpu_fallback_class_name = cpu_scan_class, + conf=updated_conf) + @approximate_float @pytest.mark.parametrize('filename', [ 'boolean.json',