Skip to content

Commit

Permalink
Update the legacy mode check: only take effect when reading date/time…
Browse files Browse the repository at this point in the history
…stamp column (#10074)

* Update the legacy mode check: only take effect when reading date/timestamp column
Signed-off-by: Chong Gao <[email protected]>
  • Loading branch information
res-life authored Jan 10, 2024
1 parent dfc1d97 commit 708bbac
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,8 @@

package com.nvidia.spark.rapids

import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types._

object DataTypeUtils {
def isNestedType(dataType: DataType): Boolean = dataType match {
Expand All @@ -26,4 +27,15 @@ object DataTypeUtils {

def hasNestedTypes(schema: StructType): Boolean =
schema.exists(f => isNestedType(f.dataType))

/**
* If `t` is date/timestamp type or its children have a date/timestamp type.
*
* @param t input date type.
* @return if contains date type.
*/
def hasDateOrTimestampType(t: DataType): Boolean = {
TrampolineUtil.dataTypeExistsRecursively(t, e =>
e.isInstanceOf[DateType] || e.isInstanceOf[TimestampType])
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -774,9 +774,11 @@ private case class GpuParquetFileFilterHandler(
val clipped = GpuParquetUtils.clipBlocksToSchema(clippedSchema, blocks, isCaseSensitive)
(clipped, clippedSchema)
}

val hasDateTimeInReadSchema = DataTypeUtils.hasDateOrTimestampType(readDataSchema)
val dateRebaseModeForThisFile = DateTimeRebaseUtils.datetimeRebaseMode(
footer.getFileMetaData.getKeyValueMetaData.get, datetimeRebaseMode)
footer.getFileMetaData.getKeyValueMetaData.get,
datetimeRebaseMode,
hasDateTimeInReadSchema)
val hasInt96Timestamps = isParquetTimeInInt96(fileSchema)
val timestampRebaseModeForThisFile = if (hasInt96Timestamps) {
DateTimeRebaseUtils.int96RebaseMode(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,7 +72,8 @@ object DateTimeRebaseUtils {
private def rebaseModeFromFileMeta(lookupFileMeta: String => String,
modeByConfig: String,
minVersion: String,
metadataKey: String): DateTimeRebaseMode = {
metadataKey: String,
hasDateTimeInReadSchema: Boolean = true): DateTimeRebaseMode = {

// If there is no version, we return the mode specified by the config.
val mode = Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
Expand All @@ -95,7 +96,7 @@ object DateTimeRebaseUtils {
// Use the default JVM time zone for backward compatibility
TimeZone.getDefault.toZoneId
}
if (fileTimeZoneId.normalized() != GpuOverrides.UTC_TIMEZONE_ID) {
if (hasDateTimeInReadSchema && fileTimeZoneId.normalized() != GpuOverrides.UTC_TIMEZONE_ID) {
throw new UnsupportedOperationException(
"LEGACY datetime rebase mode is only supported for files written in UTC timezone. " +
s"Actual file timezone: $fileTimeZoneId")
Expand All @@ -106,9 +107,10 @@ object DateTimeRebaseUtils {
}

def datetimeRebaseMode(lookupFileMeta: String => String,
modeByConfig: String): DateTimeRebaseMode = {
modeByConfig: String,
hasDateTimeInReadSchema: Boolean = true): DateTimeRebaseMode = {
rebaseModeFromFileMeta(lookupFileMeta, modeByConfig, "3.0.0",
SPARK_LEGACY_DATETIME_METADATA_KEY)
SPARK_LEGACY_DATETIME_METADATA_KEY, hasDateTimeInReadSchema)
}

def int96RebaseMode(lookupFileMeta: String => String,
Expand Down

0 comments on commit 708bbac

Please sign in to comment.