Skip to content

Commit

Permalink
Revert ":Revert "Improve dateFormat support in GpuJsonScan and make t…
Browse files Browse the repository at this point in the history
…ests consistent with GpuStructsToJson [databricks] (NVIDIA#9975)""

This reverts commit 90a7dca.
  • Loading branch information
andygrove committed Jan 10, 2024
1 parent 1990119 commit f243f91
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 156 deletions.
2 changes: 1 addition & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStr

Dates are partially supported but there are some known issues:

- Only the default `dateFormat` of `yyyy-MM-dd` is supported. The query will fall back to CPU if any other format
- Only the default `dateFormat` of `yyyy-MM-dd` is supported in Spark 3.1.x. The query will fall back to CPU if any other format
is specified ([#9667](https://github.com/NVIDIA/spark-rapids/issues/9667))
- Strings containing integers with more than four digits will be
parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4
Expand Down
275 changes: 176 additions & 99 deletions integration_tests/src/main/python/json_test.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -359,7 +359,7 @@ abstract class CSVPartitionReaderBase[BUFF <: LineBufferer, FACT <: LineBufferer
}
}

override def dateFormat: String = GpuCsvUtils.dateFormatInRead(parsedOptions)
override def dateFormat: Option[String] = Some(GpuCsvUtils.dateFormatInRead(parsedOptions))
override def timestampFormat: String = GpuCsvUtils.timestampFormatInRead(parsedOptions)
}

Expand Down
14 changes: 11 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1301,7 +1301,8 @@ object GpuCast {
def convertDateOrNull(
input: ColumnVector,
regex: String,
cudfFormat: String): ColumnVector = {
cudfFormat: String,
failOnInvalid: Boolean = false): ColumnVector = {

val prog = new RegexProgram(regex, CaptureGroups.NON_CAPTURE)
val isValidDate = withResource(input.matchesRe(prog)) { isMatch =>
Expand All @@ -1311,6 +1312,13 @@ object GpuCast {
}

withResource(isValidDate) { _ =>
if (failOnInvalid) {
withResource(isValidDate.all()) { all =>
if (all.isValid && !all.getBoolean) {
throw new DateTimeException("One or more values is not a valid date")
}
}
}
withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { orElse =>
withResource(input.asTimestampDays(cudfFormat)) { asDays =>
isValidDate.ifElse(asDays, orElse)
Expand Down Expand Up @@ -1393,7 +1401,7 @@ object GpuCast {
}
}

private def castStringToDateAnsi(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
def castStringToDateAnsi(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
val result = castStringToDate(input)
if (ansiMode) {
// When ANSI mode is enabled, we need to throw an exception if any values could not be
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids

import java.time.DateTimeException
import java.util
import java.util.Optional

import scala.collection.mutable.ListBuffer
Expand All @@ -26,7 +27,6 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.DateUtils.{toStrf, TimestampFormatConversionException}
import com.nvidia.spark.rapids.jni.CastStrings
import com.nvidia.spark.rapids.shims.GpuTypeShims
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand Down Expand Up @@ -372,18 +372,14 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
}
}

def dateFormat: String
def dateFormat: Option[String]
def timestampFormat: String

def castStringToDate(input: ColumnVector, dt: DType): ColumnVector = {
castStringToDate(input, dt, failOnInvalid = true)
}

def castStringToDate(input: ColumnVector, dt: DType, failOnInvalid: Boolean): ColumnVector = {
val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true)
val cudfFormat = DateUtils.toStrf(dateFormat.getOrElse("yyyy-MM-dd"), parseString = true)
withResource(input.strip()) { stripped =>
withResource(stripped.isTimestamp(cudfFormat)) { isDate =>
if (failOnInvalid && GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) {
if (GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) {
withResource(isDate.all()) { all =>
if (all.isValid && !all.getBoolean) {
throw new DateTimeException("One or more values is not a valid date")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@ import ai.rapids.cudf
import ai.rapids.cudf.{CaptureGroups, ColumnVector, DType, NvtxColor, RegexProgram, Scalar, Schema, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuJsonToStructsShim, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -113,16 +113,15 @@ object GpuJsonScan {

val hasDates = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[DateType])
if (hasDates) {
GpuJsonUtils.optionalDateFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd") =>
// this is fine
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
GpuJsonToStructsShim.tagDateFormatSupport(meta,
GpuJsonUtils.optionalDateFormatInRead(parsedOptions))
}

val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[TimestampType])
if (hasTimestamps) {
GpuJsonToStructsShim.tagTimestampFormatSupport(meta,
GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions))

GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]") =>
// this is fine
Expand Down Expand Up @@ -163,10 +162,16 @@ object GpuJsonScan {
tagSupportOptions(parsedOptions, meta)

val types = readSchema.map(_.dataType)
if (types.contains(DateType)) {

val hasDates = TrampolineUtil.dataTypeExistsRecursively(readSchema, _.isInstanceOf[DateType])
if (hasDates) {

GpuTextBasedDateUtils.tagCudfFormat(meta,
GpuJsonUtils.dateFormatInRead(parsedOptions), parseString = true)

GpuJsonToStructsShim.tagDateFormatSupportFromScan(meta,
GpuJsonUtils.optionalDateFormatInRead(parsedOptions))

// For date type, timezone needs to be checked also. This is because JVM timezone is used
// to get days offset before rebasing Julian to Gregorian in Spark while not in Rapids.
//
Expand Down Expand Up @@ -446,6 +451,10 @@ class JsonPartitionReader(
}
}

override def castStringToDate(input: ColumnVector, dt: DType): ColumnVector = {
GpuJsonToStructsShim.castJsonStringToDateFromScan(input, dt, dateFormat)
}

/**
* JSON has strict rules about valid numeric formats. See https://www.json.org/ for specification.
*
Expand Down Expand Up @@ -490,6 +499,6 @@ class JsonPartitionReader(
}
}

override def dateFormat: String = GpuJsonUtils.dateFormatInRead(parsedOptions)
override def dateFormat: Option[String] = GpuJsonUtils.optionalDateFormatInRead(parsedOptions)
override def timestampFormat: String = GpuJsonUtils.timestampFormatInRead(parsedOptions)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,36 +17,28 @@
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "334"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.{ColumnVector, Scalar}
import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids.{GpuCast, GpuOverrides, RapidsMeta}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuCast

import org.apache.spark.sql.catalyst.json.GpuJsonUtils
import org.apache.spark.sql.rapids.ExceptionTimeParserPolicy

object GpuJsonToStructsShim {
def tagDateFormatSupport(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
dateFormat match {
case None | Some("yyyy-MM-dd") =>
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
}

def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = {
GpuJsonUtils.dateFormatInRead(options) match {
case "yyyy-MM-dd" =>
GpuJsonUtils.optionalDateFormatInRead(options) match {
case None | Some("yyyy-MM-dd") =>
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
GpuCast.castStringToDate(trimmed)
Expand All @@ -58,6 +50,27 @@ object GpuJsonToStructsShim {
}
}

def tagDateFormatSupportFromScan(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
tagDateFormatSupport(meta, dateFormat)
}

def castJsonStringToDateFromScan(input: ColumnVector, dt: DType,
dateFormat: Option[String]): ColumnVector = {
dateFormat match {
case None | Some("yyyy-MM-dd") =>
withResource(input.strip()) { trimmed =>
GpuCast.castStringToDateAnsi(trimmed, ansiMode =
GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy)
}
case other =>
// should be unreachable due to GpuOverrides checks
throw new IllegalStateException(s"Unsupported dateFormat $other")
}
}

def tagTimestampFormatSupport(meta: RapidsMeta[_, _, _],
timestampFormat: Option[String]): Unit = {}

def castJsonStringToTimestamp(input: ColumnVector,
options: Map[String, String]): ColumnVector = {
withResource(Scalar.fromString(" ")) { space =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*** spark-rapids-shim-json-lines
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "334"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids.{DateUtils, GpuCast, GpuOverrides, RapidsMeta}
import com.nvidia.spark.rapids.Arm.withResource

import org.apache.spark.sql.rapids.ExceptionTimeParserPolicy

object GpuJsonToStructsShim {

def tagDateFormatSupport(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
// dateFormat is ignored by JsonToStructs in Spark 3.2.x and 3.3.x because it just
// performs a regular cast from string to date
}

def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = {
// dateFormat is ignored in from_json in Spark 3.2
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
GpuCast.castStringToDate(trimmed)
}
}
}

def tagDateFormatSupportFromScan(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
}

def castJsonStringToDateFromScan(input: ColumnVector, dt: DType,
dateFormat: Option[String]): ColumnVector = {
dateFormat match {
case None =>
// legacy behavior
withResource(input.strip()) { trimmed =>
GpuCast.castStringToDateAnsi(trimmed, ansiMode =
GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy)
}
case Some(fmt) =>
withResource(input.strip()) { trimmed =>
val regexRoot = fmt
.replace("yyyy", raw"\d{4}")
.replace("MM", raw"\d{1,2}")
.replace("dd", raw"\d{1,2}")
val cudfFormat = DateUtils.toStrf(fmt, parseString = true)
GpuCast.convertDateOrNull(trimmed, "^" + regexRoot + "$", cudfFormat,
failOnInvalid = GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy)
}
}
}

def tagTimestampFormatSupport(meta: RapidsMeta[_, _, _],
timestampFormat: Option[String]): Unit = {
// we only support the case where no format is specified
timestampFormat.foreach(f => meta.willNotWorkOnGpu(s"Unsupported timestampFormat: $f"))
}

def castJsonStringToTimestamp(input: ColumnVector,
options: Map[String, String]): ColumnVector = {
// legacy behavior
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
// from_json doesn't respect ansi mode
GpuCast.castStringToTimestamp(trimmed, ansiMode = false)
}
}
}
}
Loading

0 comments on commit f243f91

Please sign in to comment.