diff --git a/docs/compatibility.md b/docs/compatibility.md index 635eeff2762..89ee85c706a 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -377,14 +377,7 @@ date. Typically, one that overflowed. ### CSV Floating Point -The CSV parser is not able to parse `NaN` values. These are -likely to be turned into null values, as described in this -[issue](https://github.com/NVIDIA/spark-rapids/issues/125). - -Some floating-point values also appear to overflow but do not for the CPU as described in this -[issue](https://github.com/NVIDIA/spark-rapids/issues/124). - -Any number that overflows will not be turned into a null value. +Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float). Also parsing of some values will not produce bit for bit identical results to what the CPU does. They are within round-off errors except when they are close enough to overflow to Inf or -Inf which @@ -480,7 +473,21 @@ The nested types(array, map and struct) are not supported yet in current version ### JSON Floating Point -Like the CSV reader, the JSON reader has the same floating point issue. Please refer to [CSV Floating Point](#csv-floating-point) section. +Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float). + +The GPU JSON reader does not support `NaN` and `Inf` values with full compatibility with Spark. + +The following are the only formats that are parsed consistently between CPU and GPU. Any other variation, including +these formats when unquoted, will produce `null` on the CPU and may produce valid `NaN` and `Inf` results on the GPU. + +```json +{ "number": "NaN" } +{ "number": "Infinity" } +{ "number": "-Infinity" } +``` + +Another limitation of the GPU JSON reader is that it will parse strings containing floating-point values where +Spark will treat them as invalid inputs and will just return `null`. ## LIKE diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index f10166fc078..0aa1cb50b4d 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -194,7 +194,7 @@ def read_impl(spark): @pytest.mark.parametrize('name,schema,options', [ ('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}), ('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}), - pytest.param('ts.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1091')), + pytest.param('ts.csv', _date_schema, {}), pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')), ('ts.csv', _ts_schema, {}), ('str.csv', _bad_str_schema, {'header': 'true'}), @@ -224,19 +224,25 @@ def read_impl(spark): pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), - pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')), + pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}), + pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}), pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), - pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125i, https://github.com/NVIDIA/spark-rapids/issues/126')), - pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125, https://github.com/NVIDIA/spark-rapids/issues/126')), + pytest.param('floats_invalid.csv', _float_schema, {'header': 'true'}), + pytest.param('floats_invalid.csv', _double_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}), + pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}), pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')), pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')), pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130')) ], ids=idfn) @pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql]) @pytest.mark.parametrize('v1_enabled_list', ["", "csv"]) -def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled_list): - updated_conf=copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +def test_basic_csv_read(std_input_path, name, schema, options, read_func, v1_enabled_list, ansi_enabled): + updated_conf=copy_and_update(_enable_all_types_conf, { + 'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.ansi.enabled': ansi_enabled + }) assert_gpu_and_cpu_are_equal_collect(read_func(std_input_path + '/' + name, schema, options), conf=updated_conf) @@ -245,13 +251,13 @@ def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled # This would require multiLine reads to work correctly so we avoid these chars StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), StringGen('[aAbB ]{0,10}'), + StringGen('[nN][aA][nN]'), + StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'), byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen, - DoubleGen(no_nans=True), # NaN, Inf, and -Inf are not supported - # Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124 - # are fixed we should not have to special case float values any more. - pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), - pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')), - pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), + DoubleGen(no_nans=False), + pytest.param(double_gen), + pytest.param(FloatGen(no_nans=False)), + pytest.param(float_gen), TimestampGen()] @approximate_float diff --git a/integration_tests/src/main/python/json_test.py b/integration_tests/src/main/python/json_test.py index 436ea277786..d2725b2f7b8 100644 --- a/integration_tests/src/main/python/json_test.py +++ b/integration_tests/src/main/python/json_test.py @@ -25,19 +25,44 @@ # This would require multiLine reads to work correctly, so we avoid these chars StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), StringGen('[aAbB ]{0,10}'), + StringGen('[nN][aA][nN]'), + StringGen('[+-]?[iI][nN][fF]([iI][nN][iI][tT][yY])?'), byte_gen, short_gen, int_gen, long_gen, boolean_gen, - # Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124 - # are fixed we should not have to special case float values any more. - pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), - pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')), - pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')), - DoubleGen(no_nans=True) + pytest.param(double_gen), + pytest.param(FloatGen(no_nans=False)), + pytest.param(float_gen), + DoubleGen(no_nans=False) ] _enable_all_types_conf = { 'spark.rapids.sql.format.json.enabled': 'true', 'spark.rapids.sql.format.json.read.enabled': 'true'} +_float_schema = StructType([ + StructField('number', FloatType())]) + +_double_schema = StructType([ + StructField('number', DoubleType())]) + +def read_json_df(data_path, schema, options = {}): + def read_impl(spark): + reader = spark.read + if not schema is None: + reader = reader.schema(schema) + for key, value in options.items(): + reader = reader.option(key, value) + return debug_df(reader.json(data_path)) + return read_impl + +def read_json_sql(data_path, schema, options = {}): + opts = options + if not schema is None: + opts = copy_and_update(options, {'schema': schema}) + def read_impl(spark): + spark.sql('DROP TABLE IF EXISTS `TMP_json_TABLE`') + return spark.catalog.createTable('TMP_json_TABLE', source='json', path=data_path, **opts) + return read_impl + @approximate_float @pytest.mark.parametrize('data_gen', [ StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), @@ -138,4 +163,24 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena .schema(schema)\ .option('timestampFormat', full_format)\ .json(data_path), - conf=updated_conf) \ No newline at end of file + conf=updated_conf) + +@approximate_float +@pytest.mark.parametrize('filename', [ + 'nan_and_inf.json', + pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')), + 'floats.json', + 'floats_invalid.json', + pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')), +]) +@pytest.mark.parametrize('schema', [_float_schema, _double_schema]) +@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql]) +@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"]) +@pytest.mark.parametrize('ansi_enabled', ["true", "false"]) +def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, ansi_enabled): + updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled}) + assert_gpu_and_cpu_are_equal_collect( + read_func(std_input_path + '/' + filename, + schema, + { "allowNonNumericNumbers": allow_non_numeric_numbers }), + conf=updated_conf) diff --git a/integration_tests/src/test/resources/floats.json b/integration_tests/src/test/resources/floats.json new file mode 100644 index 00000000000..b24a003c1ab --- /dev/null +++ b/integration_tests/src/test/resources/floats.json @@ -0,0 +1,5 @@ +{ "number": -3.141592 } +{ "number": 3.141592 } +{ "number": 0.0 } +{ "number": -0.0 } +{ "number": -3.4028234663852886e+38 } \ No newline at end of file diff --git a/integration_tests/src/test/resources/floats_edge_cases.json b/integration_tests/src/test/resources/floats_edge_cases.json new file mode 100644 index 00000000000..504d783418d --- /dev/null +++ b/integration_tests/src/test/resources/floats_edge_cases.json @@ -0,0 +1,3 @@ +{ "number": "-3.141592" } +{ "number": "3.141592" } +{ "number": "-3.4028234663852886e+38" } diff --git a/integration_tests/src/test/resources/floats_invalid.csv b/integration_tests/src/test/resources/floats_invalid.csv new file mode 100644 index 00000000000..ccfdaaf08ae --- /dev/null +++ b/integration_tests/src/test/resources/floats_invalid.csv @@ -0,0 +1,5 @@ +"number" +true +false +bad +"bad" \ No newline at end of file diff --git a/integration_tests/src/test/resources/floats_invalid.json b/integration_tests/src/test/resources/floats_invalid.json new file mode 100644 index 00000000000..60b1845ebf1 --- /dev/null +++ b/integration_tests/src/test/resources/floats_invalid.json @@ -0,0 +1,3 @@ +{ "number": true } +{ "number": "not a float" } +{ "number": "" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/nan_and_inf.csv b/integration_tests/src/test/resources/nan_and_inf.csv index b2f8f78e751..51e148e9d37 100644 --- a/integration_tests/src/test/resources/nan_and_inf.csv +++ b/integration_tests/src/test/resources/nan_and_inf.csv @@ -1,8 +1,13 @@ "number" NaN Inf ++Inf -Inf NAN nan INF ++INF -INF +Infinity ++Infinity +-Infinity diff --git a/integration_tests/src/test/resources/nan_and_inf.json b/integration_tests/src/test/resources/nan_and_inf.json new file mode 100644 index 00000000000..e4aab168de4 --- /dev/null +++ b/integration_tests/src/test/resources/nan_and_inf.json @@ -0,0 +1,3 @@ +{ "number": "NaN" } +{ "number": "Infinity" } +{ "number": "-Infinity" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/nan_and_inf_edge_cases.json b/integration_tests/src/test/resources/nan_and_inf_edge_cases.json new file mode 100644 index 00000000000..c27a2291626 --- /dev/null +++ b/integration_tests/src/test/resources/nan_and_inf_edge_cases.json @@ -0,0 +1,12 @@ +{ "number": "NAN" } +{ "number": "nan" } +{ "number": "INF" } +{ "number": "+INF" } +{ "number": "-INF" } +{ "number": INF } +{ "number": +INF } +{ "number": -INF } +{ "number": "Inf" } +{ "number": "+Inf" } +{ "number": "-Inf" } +{ "number": "+Infinity" } \ No newline at end of file diff --git a/integration_tests/src/test/resources/simple_float_values.csv b/integration_tests/src/test/resources/simple_float_values.csv index f7a20131283..1fdc6e048b5 100644 --- a/integration_tests/src/test/resources/simple_float_values.csv +++ b/integration_tests/src/test/resources/simple_float_values.csv @@ -16,12 +16,4 @@ bad 1.7976931348623157E308 1.7976931348623157e+308 1.7976931348623158E308 -1.2e-234 -NAN -nan -NaN -Inf --Inf -INF --INF - +1.2e-234 \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index d67b710382a..515db04fc11 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -16,9 +16,10 @@ package com.nvidia.spark.rapids +import scala.collection.mutable.ListBuffer import scala.math.max -import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.CompressionCodecFactory @@ -27,7 +28,7 @@ import org.apache.spark.TaskContext import org.apache.spark.sql.connector.read.PartitionReader import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -164,7 +165,19 @@ abstract class GpuTextBasedPartitionReader( } else { readDataSchema } - val cudfSchema = GpuColumnVector.from(dataSchema) + + // read floating-point columns as strings in cuDF + val dataSchemaWithStrings = StructType(dataSchema.fields + .map(f => { + f.dataType match { + case DataTypes.FloatType | DataTypes.DoubleType => + f.copy(dataType = DataTypes.StringType) + case _ => + f + } + })) + val cudfSchema = GpuColumnVector.from(dataSchemaWithStrings) + // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME)) @@ -175,7 +188,30 @@ abstract class GpuTextBasedPartitionReader( } maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - handleResult(newReadDataSchema, table) + // parse floating-point columns that were read as strings + val castTable = withResource(table) { _ => + val columns = new ListBuffer[ColumnVector]() + // Table increases the ref counts on the columns so we have + // to close them after creating the table + withResource(columns) { _ => + // ansi mode does not apply to text inputs + val ansiEnabled = false + for (i <- 0 until table.getNumberOfColumns) { + val castColumn = dataSchema.fields(i).dataType match { + case DataTypes.FloatType => + GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32) + case DataTypes.DoubleType => + GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64) + case _ => + table.getColumn(i).incRefCount() + } + columns += castColumn + } + new Table(columns: _*) + } + } + + handleResult(newReadDataSchema, castTable) } } finally { dataBuffer.close() diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala index 95ed3a5b2be..05104801314 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/HashAggregatesSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,7 +59,8 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { conf: SparkConf = new SparkConf(), execsAllowedNonGpu: Seq[String] = Seq.empty, batchSize: Int = 0, - repart: Int = 1) + repart: Int = 1, + maxFloatDiff: Double = 0.0) (fn: DataFrame => DataFrame) { if (batchSize > 0) { makeBatchedBytes(batchSize, conf) @@ -69,7 +70,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual(testName, df, conf = conf, repart = repart, execsAllowedNonGpu = execsAllowedNonGpu, - incompat = true, sort = true)(fn) + incompat = true, sort = true, maxFloatDiff = maxFloatDiff)(fn) } def firstDf(spark: SparkSession): DataFrame = { @@ -637,6 +638,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by doubles", doubleCsvDf, + maxFloatDiff = 0.000001, conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("doubles").agg( lit(456f), @@ -653,6 +655,7 @@ class HashAggregatesSuite extends SparkQueryCompareTestSuite { FLOAT_TEST_testSparkResultsAreEqual( "doubles basic aggregates group by more_doubles", doubleCsvDf, + maxFloatDiff = 0.000001, conf = makeBatchedBytes(3, enableCsvConf())) { frame => frame.groupBy("more_doubles").agg( lit(456f),