Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve support for reading CSV and JSON floating-point values #4637

Merged
merged 13 commits into from
Feb 2, 2022
22 changes: 10 additions & 12 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -194,7 +194,7 @@ def read_impl(spark):
@pytest.mark.parametrize('name,schema,options', [
('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}),
('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}),
pytest.param('ts.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1091')),
pytest.param('ts.csv', _date_schema, {}),
pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')),
('ts.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
Expand Down Expand Up @@ -224,11 +224,11 @@ def read_impl(spark):
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_long_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1986')),
pytest.param('empty_int_values.csv', _empty_float_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_double_schema, {'header': 'true'}),
pytest.param('nan_and_inf.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125i, https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124, https://github.com/NVIDIA/spark-rapids/issues/125, https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/126')),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2071')),
pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')),
pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130'))
Expand All @@ -246,12 +246,10 @@ def test_basic_read(std_input_path, name, schema, options, read_func, v1_enabled
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
StringGen('[aAbB ]{0,10}'),
byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen,
DoubleGen(no_nans=True), # NaN, Inf, and -Inf are not supported
# Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124
# are fixed we should not have to special case float values any more.
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')),
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
DoubleGen(no_nans=False),
pytest.param(double_gen),
pytest.param(FloatGen(no_nans=False)),
pytest.param(float_gen),
TimestampGen()]

@approximate_float
Expand Down
10 changes: 4 additions & 6 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
StringGen('[aAbB ]{0,10}'),
byte_gen, short_gen, int_gen, long_gen, boolean_gen,
# Once https://github.com/NVIDIA/spark-rapids/issues/125 and https://github.com/NVIDIA/spark-rapids/issues/124
# are fixed we should not have to special case float values any more.
pytest.param(double_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
pytest.param(FloatGen(no_nans=True), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/124')),
pytest.param(float_gen, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/125')),
DoubleGen(no_nans=True)
pytest.param(double_gen),
pytest.param(FloatGen(no_nans=False)),
pytest.param(float_gen),
DoubleGen(no_nans=False)
]

_enable_all_types_conf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.nvidia.spark.rapids

import scala.collection.mutable.ListBuffer
import scala.math.max

import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Schema, Table}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand All @@ -27,7 +28,8 @@ import org.apache.spark.TaskContext
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -164,7 +166,19 @@ abstract class GpuTextBasedPartitionReader(
} else {
readDataSchema
}
val cudfSchema = GpuColumnVector.from(dataSchema)

// read floating-point columns as strings in cuDF
val dataSchemaWithStrings = StructType(dataSchema.fields
.map(f => {
f.dataType match {
case DataTypes.FloatType | DataTypes.DoubleType =>
f.copy(dataType = DataTypes.StringType)
case _ =>
f
}
}))
val cudfSchema = GpuColumnVector.from(dataSchemaWithStrings)

// about to start using the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get(), metrics(SEMAPHORE_WAIT_TIME))

Expand All @@ -175,7 +189,29 @@ abstract class GpuTextBasedPartitionReader(
}
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)

handleResult(newReadDataSchema, table)
// parse floating-point columns that were read as strings
val castTable = withResource(table) { _ =>
val columns = new ListBuffer[ColumnVector]()
val ansiEnabled = SQLConf.get.ansiEnabled
for (i <- 0 until table.getNumberOfColumns) {
val castColumn = dataSchema.fields(i).dataType match {
case DataTypes.FloatType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32)
case DataTypes.DoubleType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
case _ =>
table.getColumn(i).incRefCount()
}
columns += castColumn
}
// Table increases the ref counts on the columns so we have
// to close them after creating the table
withResource(columns) { _ =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
new Table(columns: _*)
}
}

handleResult(newReadDataSchema, castTable)
}
} finally {
dataBuffer.close()
Expand Down