Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve dateFormat support in GpuJsonScan and make tests consistent with GpuStructsToJson [databricks] #9975

Merged
merged 50 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b393caa
upmerge
andygrove Dec 6, 2023
fa09f44
Revert change to csv_test
andygrove Dec 6, 2023
0caced0
upmerge
andygrove Dec 7, 2023
e661b47
Merge remote-tracking branch 'nvidia/branch-24.02' into json-scan-dat…
andygrove Dec 11, 2023
06e8db6
scalastyle
andygrove Dec 11, 2023
50c8273
introduce shim
andygrove Dec 11, 2023
9ec6bd6
remove unreachable code
andygrove Dec 11, 2023
a531a6b
fix some failures with 311
andygrove Dec 11, 2023
d4f9147
save progress
andygrove Dec 12, 2023
b6f2ff1
fix more failures with 340
andygrove Dec 12, 2023
fd50335
fix test failures with 341
andygrove Dec 12, 2023
01dd4bf
tests pass with 341
andygrove Dec 12, 2023
70011f8
Add 330 shim and fix failures in test_basic_json_read
andygrove Dec 12, 2023
c65b29f
save progress on 330 shim
andygrove Dec 12, 2023
1facac3
tests pass with 330
andygrove Dec 13, 2023
c555b09
320 shim
andygrove Dec 13, 2023
816f009
test all date formats with from_json
andygrove Dec 13, 2023
d82ae6b
remove redundant and confusing use of failOnInvalid parameter
andygrove Dec 13, 2023
d7f87dc
Revert unrelated change
andygrove Dec 13, 2023
958d4e1
Remove comment
andygrove Dec 13, 2023
92e68ba
Remove blank line
andygrove Dec 13, 2023
7a51cfe
Remove blank line
andygrove Dec 13, 2023
9f95976
Revert accidental change to test_basic_json_read
andygrove Dec 13, 2023
3df7893
Fix compilation error caused by refactor
andygrove Dec 13, 2023
0d2de96
Scala style
andygrove Dec 13, 2023
1442519
Scala style
andygrove Dec 13, 2023
b6e7c4d
update compatibility guide
andygrove Dec 13, 2023
5cf603b
move json-specific date parsing into GpuJsonScan to fix regression in…
andygrove Dec 13, 2023
fc9779e
fix regression introduced during refactor
andygrove Dec 14, 2023
6e8ad5c
fall back to CPU if timestampFormat specified in 320 shim
andygrove Dec 15, 2023
030afac
fall back to CPU if timestampFormat specified in 340 shim
andygrove Dec 15, 2023
793c731
fix ci failure with 341db
andygrove Dec 15, 2023
9bea908
Merge remote-tracking branch 'nvidia/branch-24.02' into json-scan-dat…
andygrove Dec 19, 2023
bbc0704
upmerge
andygrove Jan 2, 2024
aedfd25
upmerge
andygrove Jan 2, 2024
51ddd71
add 334 shim
andygrove Jan 2, 2024
77ad4ab
add clarifying comment
andygrove Jan 5, 2024
f3ecbc6
update copyright years to 2024
andygrove Jan 5, 2024
3b30a20
use None instead of empty string in tests
andygrove Jan 5, 2024
51a8da3
fix copyright years
andygrove Jan 5, 2024
bef3ef1
remove xfail from tests
andygrove Jan 6, 2024
51c13c2
fix regression
andygrove Jan 8, 2024
8648b0c
Update integration_tests/src/main/python/json_test.py
andygrove Jan 8, 2024
8b0b9c5
Update integration_tests/src/main/python/json_test.py
andygrove Jan 8, 2024
4b183a4
fix regression
andygrove Jan 8, 2024
a753e88
Revert "fix regression"
andygrove Jan 8, 2024
ea80821
update more tests to use None instead of empty string
andygrove Jan 8, 2024
9e9acc4
allow fallback for non-utc in test_json_read_generated_dates
andygrove Jan 8, 2024
cf22537
update more tests to use None instead of empty string
andygrove Jan 8, 2024
b5779a0
Merge remote-tracking branch 'nvidia/branch-24.02' into json-scan-dat…
andygrove Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 164 additions & 98 deletions integration_tests/src/main/python/json_test.py
jlowe marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ abstract class CSVPartitionReaderBase[BUFF <: LineBufferer, FACT <: LineBufferer
}
}

override def dateFormat: String = GpuCsvUtils.dateFormatInRead(parsedOptions)
override def dateFormat: Option[String] = Some(GpuCsvUtils.dateFormatInRead(parsedOptions))
override def timestampFormat: String = GpuCsvUtils.timestampFormatInRead(parsedOptions)
}

Expand Down
17 changes: 11 additions & 6 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ package com.nvidia.spark.rapids
import java.text.SimpleDateFormat
import java.time.DateTimeException
import java.util.Optional

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DecimalUtils, DType, RegexProgram, Scalar}
import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, DecimalUtils, RegexProgram, Scalar}
import ai.rapids.cudf
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.jni.CastStrings
import com.nvidia.spark.rapids.shims.{AnsiUtil, GpuCastShims, GpuIntervalUtils, GpuTypeShims, SparkShimImpl, YearParseUtil}
import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_SECOND
Expand Down Expand Up @@ -1284,7 +1281,8 @@ object GpuCast {
def convertDateOrNull(
input: ColumnVector,
regex: String,
cudfFormat: String): ColumnVector = {
cudfFormat: String,
failOnInvalid: Boolean = false): ColumnVector = {

val prog = new RegexProgram(regex, CaptureGroups.NON_CAPTURE)
val isValidDate = withResource(input.matchesRe(prog)) { isMatch =>
Expand All @@ -1294,6 +1292,13 @@ object GpuCast {
}

withResource(isValidDate) { _ =>
if (failOnInvalid) {
withResource(isValidDate.all()) { all =>
if (all.isValid && !all.getBoolean) {
throw new DateTimeException("One or more values is not a valid date")
}
}
}
withResource(Scalar.fromNull(DType.TIMESTAMP_DAYS)) { orElse =>
withResource(input.asTimestampDays(cudfFormat)) { asDays =>
isValidDate.ifElse(asDays, orElse)
Expand Down Expand Up @@ -1376,7 +1381,7 @@ object GpuCast {
}
}

private def castStringToDateAnsi(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
def castStringToDateAnsi(input: ColumnVector, ansiMode: Boolean): ColumnVector = {
val result = castStringToDate(input)
if (ansiMode) {
// When ANSI mode is enabled, we need to throw an exception if any values could not be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids

import java.time.DateTimeException
import java.util
import java.util.Optional

import scala.collection.mutable.ListBuffer
Expand All @@ -25,8 +25,7 @@ import ai.rapids.cudf.{CaptureGroups, ColumnVector, DType, HostColumnVector, Hos
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.DateUtils.{toStrf, TimestampFormatConversionException}
import com.nvidia.spark.rapids.jni.CastStrings
import com.nvidia.spark.rapids.shims.GpuTypeShims
import java.util
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that scala style didn't catch this misplaced import

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should stop importing Java classes, let alone pretend it is a Scala package object. IMO, having to explicitly reference the Java class on initialization makes the code more clear.

import com.nvidia.spark.rapids.shims.{GpuJsonToStructsShim, GpuTypeShims}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory
Expand All @@ -36,7 +35,7 @@ import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{ExceptionTimeParserPolicy, GpuToTimestamp, LegacyTimeParserPolicy}
import org.apache.spark.sql.rapids.{GpuToTimestamp, LegacyTimeParserPolicy}
import org.apache.spark.sql.types.{DataTypes, DecimalType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -372,31 +371,11 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
}
}

def dateFormat: String
def dateFormat: Option[String]
def timestampFormat: String

def castStringToDate(input: ColumnVector, dt: DType): ColumnVector = {
castStringToDate(input, dt, failOnInvalid = true)
}

def castStringToDate(input: ColumnVector, dt: DType, failOnInvalid: Boolean): ColumnVector = {
val cudfFormat = DateUtils.toStrf(dateFormat, parseString = true)
withResource(input.strip()) { stripped =>
withResource(stripped.isTimestamp(cudfFormat)) { isDate =>
if (failOnInvalid && GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy) {
withResource(isDate.all()) { all =>
if (all.isValid && !all.getBoolean) {
throw new DateTimeException("One or more values is not a valid date")
}
}
}
withResource(stripped.asTimestamp(dt, cudfFormat)) { asDate =>
withResource(Scalar.fromNull(dt)) { nullScalar =>
isDate.ifElse(asDate, nullScalar)
}
}
}
}
GpuJsonToStructsShim.castJsonStringToDateFromScan(input, dt, dateFormat)
}

def castStringToTimestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ai.rapids.cudf
import ai.rapids.cudf.{CaptureGroups, ColumnVector, DType, NvtxColor, RegexProgram, Scalar, Schema, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuJsonToStructsShim, LegacyBehaviorPolicyShim, ShimFilePartitionReaderFactory}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -113,16 +113,15 @@ object GpuJsonScan {

val hasDates = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[DateType])
if (hasDates) {
GpuJsonUtils.optionalDateFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd") =>
// this is fine
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
GpuJsonToStructsShim.tagDateFormatSupport(meta,
GpuJsonUtils.optionalDateFormatInRead(parsedOptions))
}

val hasTimestamps = TrampolineUtil.dataTypeExistsRecursively(dt, _.isInstanceOf[TimestampType])
if (hasTimestamps) {
GpuJsonToStructsShim.tagTimestampFormatSupport(meta,
GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions))

GpuJsonUtils.optionalTimestampFormatInRead(parsedOptions) match {
case None | Some("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]") =>
// this is fine
Expand Down Expand Up @@ -163,10 +162,16 @@ object GpuJsonScan {
tagSupportOptions(parsedOptions, meta)

val types = readSchema.map(_.dataType)
if (types.contains(DateType)) {

val hasDates = TrampolineUtil.dataTypeExistsRecursively(readSchema, _.isInstanceOf[DateType])
if (hasDates) {

GpuTextBasedDateUtils.tagCudfFormat(meta,
GpuJsonUtils.dateFormatInRead(parsedOptions), parseString = true)

GpuJsonToStructsShim.tagDateFormatSupportFromScan(meta,
GpuJsonUtils.optionalDateFormatInRead(parsedOptions))

// For date type, timezone needs to be checked also. This is because JVM timezone is used
// to get days offset before rebasing Julian to Gregorian in Spark while not in Rapids.
//
Expand Down Expand Up @@ -490,6 +495,6 @@ class JsonPartitionReader(
}
}

override def dateFormat: String = GpuJsonUtils.dateFormatInRead(parsedOptions)
override def dateFormat: Option[String] = GpuJsonUtils.optionalDateFormatInRead(parsedOptions)
override def timestampFormat: String = GpuJsonUtils.timestampFormatInRead(parsedOptions)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,28 @@
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.{ColumnVector, Scalar}
import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuCast
import com.nvidia.spark.rapids.{GpuCast, RapidsMeta, GpuOverrides}

import org.apache.spark.sql.catalyst.json.GpuJsonUtils
import org.apache.spark.sql.rapids.ExceptionTimeParserPolicy

object GpuJsonToStructsShim {
def tagDateFormatSupport(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
dateFormat match {
case None | Some("yyyy-MM-dd") =>
case dateFormat =>
meta.willNotWorkOnGpu(s"GpuJsonToStructs unsupported dateFormat $dateFormat")
}
}

def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = {
GpuJsonUtils.dateFormatInRead(options) match {
case "yyyy-MM-dd" =>
GpuJsonUtils.optionalDateFormatInRead(options) match {
case None | Some("yyyy-MM-dd") =>
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
GpuCast.castStringToDate(trimmed)
Expand All @@ -57,6 +50,27 @@ object GpuJsonToStructsShim {
}
}

def tagDateFormatSupportFromScan(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
tagDateFormatSupport(meta, dateFormat)
}

def castJsonStringToDateFromScan(input: ColumnVector, dt: DType,
dateFormat: Option[String]): ColumnVector = {
dateFormat match {
case None | Some("yyyy-MM-dd") =>
withResource(input.strip()) { trimmed =>
GpuCast.castStringToDateAnsi(trimmed, ansiMode =
GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy)
}
case other =>
// should be unreachable due to GpuOverrides checks
throw new IllegalStateException(s"Unsupported dateFormat $other")
}
}

def tagTimestampFormatSupport(meta: RapidsMeta[_, _, _],
timestampFormat: Option[String]): Unit = {}

def castJsonStringToTimestamp(input: ColumnVector,
options: Map[String, String]): ColumnVector = {
withResource(Scalar.fromString(" ")) { space =>
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2024 copyrights

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*** spark-rapids-shim-json-lines
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import ai.rapids.cudf.{ColumnVector, DType, Scalar}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.{DateUtils, GpuCast, GpuOverrides, RapidsMeta}

import org.apache.spark.sql.rapids.ExceptionTimeParserPolicy

object GpuJsonToStructsShim {

def tagDateFormatSupport(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really support all possible date formats? Or is any specified date format ignored on Spark 3.2+?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a clarifying comment here:

    // dateFormat is ignored by JsonToStructs in Spark 3.2.x and 3.3.x because it just
    // performs a regular cast from string to date


def castJsonStringToDate(input: ColumnVector, options: Map[String, String]): ColumnVector = {
// dateFormat is ignored in from_json in Spark 3.2
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
GpuCast.castStringToDate(trimmed)
}
}
}

def tagDateFormatSupportFromScan(meta: RapidsMeta[_, _, _], dateFormat: Option[String]): Unit = {
}

def castJsonStringToDateFromScan(input: ColumnVector, dt: DType,
dateFormat: Option[String]): ColumnVector = {
dateFormat match {
case None =>
// legacy behavior
withResource(input.strip()) { trimmed =>
GpuCast.castStringToDateAnsi(trimmed, ansiMode =
GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy)
}
case Some(f) =>
withResource(input.strip()) { trimmed =>
val regexRoot = dateFormatPattern
.replace("yyyy", raw"\d{4}")
.replace("MM", raw"\d{1,2}")
.replace("dd", raw"\d{1,2}")
val cudfFormat = DateUtils.toStrf(dateFormatPattern, parseString = true)
GpuCast.convertDateOrNull(input, "^" + regexRoot + "$", cudfFormat,
failOnInvalid = GpuOverrides.getTimeParserPolicy == ExceptionTimeParserPolicy)
}
}
}

def tagTimestampFormatSupport(meta: RapidsMeta[_, _, _],
timestampFormat: Option[String]): Unit = {}

def castJsonStringToTimestamp(input: ColumnVector,
options: Map[String, String]): ColumnVector = {
// legacy behavior
withResource(Scalar.fromString(" ")) { space =>
withResource(input.strip(space)) { trimmed =>
// from_json doesn't respect ansi mode
GpuCast.castStringToTimestamp(trimmed, ansiMode = false)
}
}
}
}
Loading
Loading