Skip to content

Commit

Permalink
:Revert "Improve dateFormat support in GpuJsonScan and make tests con…
Browse files Browse the repository at this point in the history
…sistent with GpuStructsToJson [databricks] (NVIDIA#9975)"

This reverts commit 47047a9.
  • Loading branch information
andygrove committed Jan 10, 2024
1 parent 708bbac commit 90a7dca
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 391 deletions.
2 changes: 1 addition & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ with Spark, and can be enabled by setting `spark.rapids.sql.expression.JsonToStr

Dates are partially supported but there are some known issues:

- Only the default `dateFormat` of `yyyy-MM-dd` is supported in Spark 3.1.x. The query will fall back to CPU if any other format
- Only the default `dateFormat` of `yyyy-MM-dd` is supported. The query will fall back to CPU if any other format
is specified ([#9667](https://github.com/NVIDIA/spark-rapids/issues/9667))
- Strings containing integers with more than four digits will be
parsed as null ([#9664](https://github.com/NVIDIA/spark-rapids/issues/9664)) whereas Spark versions prior to 3.4
Expand Down
275 changes: 99 additions & 176 deletions integration_tests/src/main/python/json_test.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -359,7 +359,7 @@ abstract class CSVPartitionReaderBase[BUFF <: LineBufferer, FACT <: LineBufferer
}
}

override def dateFormat: Option[String] = Some(GpuCsvUtils.dateFormatInRead(parsedOptions))
override def dateFormat: String = GpuCsvUtils.dateFormatInRead(parsedOptions)
override def timestampFormat: String = GpuCsvUtils.timestampFormatInRead(parsedOptions)
}

Expand Down
14 changes: 3 additions & 11 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1301,8 +1301,7 @@ object GpuCast {
def convertDateOrNull(
input: ColumnVector,
regex: String,
cudfFormat: String,
failOnInvalid: Boolean = false): ColumnVector = {
cudfFormat: String): ColumnVector = {

val prog = new RegexProgram(regex, CaptureGroups.NON_CAPTURE)
val isValidDate = withResource(input.matchesRe(prog)) { isMatch =>
Expand All @@ -1312,13 +1311,6 @@ object GpuCast {
}

withResource(isValidDate) { _ =>
if (failOnInvalid) {
withResource(isValidDate.all()) { all =>
if (all.isValid && !all.getBoolean) {
throw new DateTimeException("One or more values is not a valid date")
}
}
}
withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { orElse =>
withResource(input.asTimestampDays(cudfFormat)) { asDays =>
isValidDate.ifElse(asDays, orElse)
Expand Down Expand Up @@ -1401,7 +1393,7 @@ object GpuCast {
}
}

def castStringToDateAnsi(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
private def castStringToDateAnsi(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
val result = castStringToDate(input)
if (ansiMode) {
// When ANSI mode is enabled, we need to throw an exception if any values could not be
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids

import java.time.DateTimeException
import java.util
import java.util.Optional

import scala.collection.mutable.ListBuffer
Expand All @@ -27,6 +26,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.DateUtils.{toStrf, TimestampFormatConversionException}
import com.nvidia.spark.rapids.jni.CastStrings
import com.nvidia.spark.rapids.shims.GpuTypeShims
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand Down Expand Up @@ -372,14 +372,18 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
}
}

def dateFormat: Option[String]
def dateFormat: String
def timestampFormat: String

def castStringToDate(input: ColumnVector, dt: DType): ColumnVector = {
val cudfFormat = DateUtils.toStrf(dateFormat.getOrElse("yyyy-MM-dd"), parseString = true)
castStringToDate(input, dt, failOnInvalid = true)
}

def castStringToDate(input: ColumnVector, dt: DType, failOnInvalid: Boolean): ColumnVector = {
val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true)
withResource(input.strip()) { stripped =>
withResource(stripped.isTimestamp(cudfFormat)) { isDate =>
if (GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) {
if (failOnInvalid && GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) {
withResource(isDate.all()) { all =>
if (all.isValid && !all.getBoolean) {
throw new DateTimeException("One or more values is not a valid date")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@ import ai.rapids.cudf
import ai.rapids.cudf.{CaptureGroups, ColumnVector, DType, NvtxColor, RegexProgram, Scalar, Schema, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuJsonToStructsShim, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -113,15 +113,16 @@ object GpuJsonScan {

val hasDates = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[DateType])
if (hasDates) {
GpuJsonToStructsShim.tagDateFormatSupport(meta,
GpuJsonUtils.optionalDateFormatInRead(parsedOptions))
GpuJsonUtils.optionalDateFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd") =>
// this is fine
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
}

val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[TimestampType])
if (hasTimestamps) {
GpuJsonToStructsShim.tagTimestampFormatSupport(meta,
GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions))

GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]") =>
// this is fine
Expand Down Expand Up @@ -162,16 +163,10 @@ object GpuJsonScan {
tagSupportOptions(parsedOptions, meta)

val types = readSchema.map(_.dataType)

val hasDates = TrampolineUtil.dataTypeExistsRecursively(readSchema, _.isInstanceOf[DateType])
if (hasDates) {

if (types.contains(DateType)) {
GpuTextBasedDateUtils.tagCudfFormat(meta,
GpuJsonUtils.dateFormatInRead(parsedOptions), parseString = true)

GpuJsonToStructsShim.tagDateFormatSupportFromScan(meta,
GpuJsonUtils.optionalDateFormatInRead(parsedOptions))

// For date type, timezone needs to be checked also. This is because JVM timezone is used
// to get days offset before rebasing Julian to Gregorian in Spark while not in Rapids.
//
Expand Down Expand Up @@ -451,10 +446,6 @@ class JsonPartitionReader(
}
}

override def castStringToDate(input: ColumnVector, dt: DType): ColumnVector = {
GpuJsonToStructsShim.castJsonStringToDateFromScan(input, dt, dateFormat)
}

/**
* JSON has strict rules about valid numeric formats. See https://www.json.org/ for specification.
*
Expand Down Expand Up @@ -499,6 +490,6 @@ class JsonPartitionReader(
}
}

override def dateFormat: Option[String] = GpuJsonUtils.optionalDateFormatInRead(parsedOptions)
override def dateFormat: String = GpuJsonUtils.dateFormatInRead(parsedOptions)
override def timestampFormat: String = GpuJsonUtils.timestampFormatInRead(parsedOptions)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,28 +17,36 @@
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "334"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids.{GpuCast, GpuOverrides, RapidsMeta}
import ai.rapids.cudf.{ColumnVector, Scalar}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuCast

import org.apache.spark.sql.catalyst.json.GpuJsonUtils
import org.apache.spark.sql.rapids.ExceptionTimeParserPolicy

object GpuJsonToStructsShim {
def tagDateFormatSupport(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
dateFormat match {
case None | Some("yyyy-MM-dd") =>
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
}

def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = {
GpuJsonUtils.optionalDateFormatInRead(options) match {
case None | Some("yyyy-MM-dd") =>
GpuJsonUtils.dateFormatInRead(options) match {
case "yyyy-MM-dd" =>
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
GpuCast.castStringToDate(trimmed)
Expand All @@ -50,27 +58,6 @@ object GpuJsonToStructsShim {
}
}

def tagDateFormatSupportFromScan(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
tagDateFormatSupport(meta, dateFormat)
}

def castJsonStringToDateFromScan(input: ColumnVector, dt: DType,
dateFormat: Option[String]): ColumnVector = {
dateFormat match {
case None | Some("yyyy-MM-dd") =>
withResource(input.strip()) { trimmed =>
GpuCast.castStringToDateAnsi(trimmed, ansiMode =
GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy)
}
case other =>
// should be unreachable due to GpuOverrides checks
throw new IllegalStateException(s"Unsupported dateFormat $other")
}
}

def tagTimestampFormatSupport(meta: RapidsMeta[_, _, _],
timestampFormat: Option[String]): Unit = {}

def castJsonStringToTimestamp(input: ColumnVector,
options: Map[String, String]): ColumnVector = {
withResource(Scalar.fromString(" ")) { space =>
Expand Down

This file was deleted.

Loading

0 comments on commit 90a7dca

Please sign in to comment.