Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move timezone check to each operator [databricks] #9482

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d8e77b2
Add test cases for timezone awarded operators
Oct 19, 2023
3f781a4
Move timezone check to each operator
Oct 19, 2023
d5a6d7a
Merge branch 23.12
Oct 27, 2023
b3fa3ee
Update
Oct 27, 2023
c31b2e3
debug
Oct 27, 2023
a7c8996
debug
Oct 27, 2023
2878c5c
Add timezone test mark
Oct 27, 2023
705f8b5
Minor update
Nov 1, 2023
882b751
Fix failed cmp case on Spark311; Restore a python import; minor changes
Nov 1, 2023
aec893c
Fix failure on Databricks
Nov 2, 2023
7f81644
Update test cases for Databricks
Nov 2, 2023
bcc1f5b
Update test cases for Databricks
Nov 2, 2023
505b72e
Fix delta lake test cases.
Nov 3, 2023
07942ea
Fix delta lake test cases.
Nov 3, 2023
3033bc3
Remove the skip logic when time zone is not UTC
Nov 7, 2023
a852455
Add time zone config to set non-UTC
Nov 7, 2023
0358cd4
Add fallback case for cast_test.py
Nov 7, 2023
f6ccadd
Add fallback case for cast_test.py
Nov 7, 2023
21d5a69
Add fallback case for cast_test.py
Nov 8, 2023
e2aa9da
Add fallback case for cast_test.py
Nov 8, 2023
9eab476
Update split_list
Nov 8, 2023
e231a80
Add fallback case for cast_test.py
Nov 8, 2023
71928a0
Add fallback case for cast_test.py
Nov 8, 2023
ca23932
Add fallback cases for cmp_test.py
Nov 9, 2023
ee60bea
Add fallback tests for json_test.py
firestarman Nov 9, 2023
d403c59
add non_utc fallback for parquet_write qa_select and window_function …
thirtiseven Nov 9, 2023
dd5ad0b
Add fallback tests for conditionals_test.py
winningsix Nov 9, 2023
058e13e
Add fallback cases for collection_ops_test.py
Nov 9, 2023
fc3a678
add fallback tests for date_time_test
thirtiseven Nov 9, 2023
938c649
clean up spark_session.py
thirtiseven Nov 9, 2023
befa39d
Add fallback tests for explain_test and csv_test
winningsix Nov 9, 2023
cf2c621
Update test case
Nov 9, 2023
c298d5f
update test case
Nov 9, 2023
09e772c
Add default value
Nov 10, 2023
f43a8f9
Remove useless is_tz_utc
Nov 10, 2023
5882cc3
Fix fallback cases
Nov 10, 2023
7a53dc2
Add bottom check for time zone; Fix ORC check
Nov 13, 2023
7bd9ef8
By default, ExecCheck do not check UTC time zone
Nov 13, 2023
9817c4e
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
f8505b7
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
fa1c84d
For common expr like AttributeReference, just skip the UTC checking
Nov 13, 2023
fbbbd5b
Update test cases
Nov 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# limitations under the License.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one line break after license header.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import pytest
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_and_cpu_error

from data_gen import *
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_fallback_collect, assert_gpu_and_cpu_error, assert_spark_exception, with_gpu_session
from datetime import date, datetime, timezone
from marks import ignore_order, incompat, allow_non_gpu
from pyspark.sql.types import *
Expand Down Expand Up @@ -558,3 +559,87 @@ def test_timestamp_millis_long_overflow():
def test_timestamp_micros(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("timestamp_micros(a)"))


# used by timezone test cases
def get_timezone_df(spark):
schema = StructType([
StructField("ts_str_col", StringType()),
StructField("long_col", LongType()),
StructField("ts_col", TimestampType()),
StructField("date_col", DateType()),
StructField("date_str_col", StringType()),
])
data = [
('1970-01-01 00:00:00', 0, datetime(1971, 1, 1), date(1971, 1, 1), '1971-01-01'),
('1970-01-01 00:00:00', 0, datetime(1971, 1, 1), date(1971, 1, 1), '1971-01-01'),
]
return spark.createDataFrame(SparkContext.getOrCreate().parallelize(data),schema)

# used by timezone test cases, specify all the sqls that will be impacted by non-utc timezone
time_zone_sql_conf_pairs = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There're some functions related to timezone (not supported yet), mentioned in Spark built-in function website. We can add some comments mentioning here.

convert_timezone
-- SELECT convert_timezone('Europe/Brussels', 'America/Los_Angeles', timestamp_ntz'2021-12-06 00:00:00');
current_timezone()
make_timestamp()
make_timestamp_ltz()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For current_timezone, it just returns the session timezone, we can ignore it for this PR.
Spark config "spark.sql.session.timeZone" can set this value.

For MakeTimestamp and ConvertTimezone, it's recorded in this follow on issue: #9570

("select minute(ts_col) from tab", {}),
("select second(ts_col) from tab", {}),
("select hour(ts_col) from tab", {}),
("select date_col + (interval 10 days 3 seconds) from tab", {}),
("select date_format(ts_col, 'yyyy-MM-dd HH:mm:ss') from tab", {}),
("select unix_timestamp(ts_col) from tab", {"spark.rapids.sql.improvedTimeOps.enabled": "true"}),
("select to_unix_timestamp(ts_str_col) from tab", {"spark.rapids.sql.improvedTimeOps.enabled": "false"}),
("select to_unix_timestamp(ts_col) from tab", {"spark.rapids.sql.improvedTimeOps.enabled": "true"}),
("select to_date(date_str_col, 'yyyy-MM-dd') from tab", {}), # test GpuGetTimestamp
("select to_date(date_str_col) from tab", {}),
("select from_unixtime(long_col, 'yyyy-MM-dd HH:mm:ss') from tab", {}),
("select cast(ts_col as string) from tab", {}), # cast
("select cast(ts_col as date) from tab", {}), # cast
("select cast(date_col as TIMESTAMP) from tab", {}), # cast
("select to_timestamp(ts_str_col) from tab", {"spark.rapids.sql.improvedTimeOps.enabled": "false"}),
("select to_timestamp(ts_str_col) from tab", {"spark.rapids.sql.improvedTimeOps.enabled": "true"}),
]


@allow_non_gpu("ProjectExec")
@pytest.mark.parametrize('sql, extra_conf', time_zone_sql_conf_pairs)
def test_timezone_for_operators_with_non_utc(sql, extra_conf):
# timezone is non-utc, should fallback to CPU
timezone_conf = {"spark.sql.session.timeZone": "+08:00",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the time zone string a param to the test? Just because I would like to test a few more time zones than just +08:00

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"spark.rapids.sql.hasExtendedYearValues": "false",
"spark.rapids.sql.castStringToTimestamp.enabled": "true"}
all_conf = copy_and_update(timezone_conf, extra_conf)
def gen_sql_df(spark):
df = get_timezone_df(spark)
df.createOrReplaceTempView("tab")
return spark.sql(sql)
assert_gpu_fallback_collect(gen_sql_df, "ProjectExec", all_conf)


@pytest.mark.parametrize('sql, conf', time_zone_sql_conf_pairs)
def test_timezone_for_operators_with_utc(sql, conf):
# timezone is utc, should be supported by GPU
timezone_conf = {"spark.sql.session.timeZone": "UTC",
"spark.rapids.sql.hasExtendedYearValues": "false",
"spark.rapids.sql.castStringToTimestamp.enabled": "true",}
conf = copy_and_update(timezone_conf, conf)
def gen_sql_df(spark):
df = get_timezone_df(spark)
df.createOrReplaceTempView("tab")
return spark.sql(sql)
assert_gpu_and_cpu_are_equal_collect(gen_sql_df, conf)


@allow_non_gpu("ProjectExec")
def test_timezone_for_operator_from_utc_timestamp_with_non_utc():
# timezone is non-utc, should fallback to CPU
def gen_sql_df(spark):
df = get_timezone_df(spark)
df.createOrReplaceTempView("tab")
return spark.sql("select from_utc_timestamp(ts_col, '+08:00') from tab")
assert_gpu_fallback_collect(gen_sql_df, "ProjectExec")


def test_timezone_for_operator_from_utc_timestamp_with_utc():
# timezone is utc, should be supported by GPU
def gen_sql_df(spark):
df = get_timezone_df(spark)
df.createOrReplaceTempView("tab")
return spark.sql("select from_utc_timestamp(ts_col, '+00:00') from tab").collect()
with_gpu_session(gen_sql_df)
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ abstract class CastExprMetaBase[INPUT <: UnaryExpression with TimeZoneAwareExpre
recursiveTagExprForGpuCheck()
}

// tag time zone by Cast itself, do not delegate to parent class
override def tagTimeZoneBySelf: Boolean = true

protected def recursiveTagExprForGpuCheck(
fromDataType: DataType = fromType,
toDataType: DataType = toType,
Expand Down Expand Up @@ -177,9 +180,6 @@ abstract class CastExprMetaBase[INPUT <: UnaryExpression with TimeZoneAwareExpre
def buildTagMessage(entry: ConfEntry[_]): String = {
s"${entry.doc}. To enable this operation on the GPU, set ${entry.key} to true."
}

// timezone tagging in type checks is good enough, so always false
override protected val needTimezoneTagging: Boolean = false
}

object CastOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,7 @@ object GpuOverrides extends Logging {
case FloatType => true
case DoubleType => true
case DateType => true
case TimestampType =>
TypeChecks.areTimestampsSupported(ZoneId.systemDefault()) &&
TypeChecks.areTimestampsSupported(SQLConf.get.sessionLocalTimeZone)
case TimestampType => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider the timezone check for scan and writer parts? AFAIK, when scanning data from Parquet, spark.sql.session.timeZone is supposed to be respect.

If applies, we should add some python tests as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is used by InternalColumnarRddConverter and HostToGpuCoalesceIterator.
Coalesce can handle non UTC timestamp. But not sure InternalColumnarRddConverter, seems it's also OK.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to check these. For me, anything that does not have a test that shows it works fully in at least one other time zone must fall back to the CPU if it sees a timestamp that is not UTC.

Parquet for example has the rebase mode for older timestamps that requires knowing the timezone to do properly.

case StringType => true
case dt: DecimalType if allowDecimal => dt.precision <= DType.DECIMAL64_MAX_PRECISION
case NullType => allowNull
Expand Down
13 changes: 6 additions & 7 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,12 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
case Some(value) => ZoneId.of(value)
case None => throw new RuntimeException(s"Driver time zone cannot be determined.")
}
if (TypeChecks.areTimestampsSupported(driverTimezone)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may off-topic. Considering the configuration spark.sql.session.timeZone, should both driver and executor respect it? Then do we still need the check on driver and executor's timezone mismatch?

Copy link
Collaborator Author

@res-life res-life Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the configuration spark.sql.session.timeZone, should both driver and executor respect it?

Here driverTimezone is from driver ZoneId.systemDefault(), not from spark.sql.session.timeZone, refer to: PR
Spark itself does not have this kind of check.

But for our spark-rapids, we check executor and driver have the same JVM time zone.

Then do we still need the check on driver and executor's timezone mismatch?

I think yes, becasue we want to avoid the issue

val executorTimezone = ZoneId.systemDefault()
if (executorTimezone.normalized() != driverTimezone.normalized()) {
throw new RuntimeException(s" Driver and executor timezone mismatch. " +
s"Driver timezone is $driverTimezone and executor timezone is " +
s"$executorTimezone. Set executor timezone to $driverTimezone.")
}

val executorTimezone = ZoneId.systemDefault()
if (executorTimezone.normalized() != driverTimezone.normalized()) {
throw new RuntimeException(s" Driver and executor timezone mismatch. " +
s"Driver timezone is $driverTimezone and executor timezone is " +
s"$executorTimezone. Set executor timezone to $driverTimezone.")
}

GpuCoreDumpHandler.executorInit(conf, pluginContext)
Expand Down
44 changes: 22 additions & 22 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable

import com.nvidia.spark.rapids.shims.{DistributionUtil, SparkShimImpl}

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryExpression, ComplexTypeMergingExpression, Expression, QuaternaryExpression, String2TrimExpression, TernaryExpression, TimeZoneAwareExpression, UnaryExpression, WindowExpression, WindowFunction}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, ImperativeAggregate, TypedImperativeAggregate}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
Expand Down Expand Up @@ -1071,37 +1071,37 @@ abstract class BaseExprMeta[INPUT <: Expression](

val isFoldableNonLitAllowed: Boolean = false

/**
* Whether to tag a TimeZoneAwareExpression for timezone after all the other tagging
* is done.
* By default a TimeZoneAwareExpression always requires the timezone tagging, but
* there are some exceptions, e.g. 'Cast', who requires timezone tagging only when it
* has timezone sensitive type as input or output.
*
* Override this to match special cases.
*/
protected def needTimezoneTagging: Boolean = {
// A TimeZoneAwareExpression with no timezone sensitive types as input/output will
// escape from the timezone tagging in the prior type checks. So ask for tagging here.
// e.g. 'UnixTimestamp' with 'DateType' as the input, timezone will be taken into
// account when converting a Date to a Long.
!(dataType +: childExprs.map(_.dataType)).exists(TypeChecks.isTimezoneSensitiveType)
}

final override def tagSelfForGpu(): Unit = {
if (wrapped.foldable && !GpuOverrides.isLit(wrapped) && !isFoldableNonLitAllowed) {
willNotWorkOnGpu(s"Cannot run on GPU. Is ConstantFolding excluded? Expression " +
s"$wrapped is foldable and operates on non literals")
}
rule.getChecks.foreach(_.tag(this))
tagExprForGpu()
wrapped match {
case tzAware: TimeZoneAwareExpression if needTimezoneTagging =>
checkTimeZoneId(tzAware.zoneId)
case _ => // do nothing

// if expr is time zone aware and GPU does not support non UTC tz for this expr yet,
// ensure it's in UTC tz
if (!tagTimeZoneBySelf && isTimeZoneAwareExpr && !supportsNonUTCTimeZone) {
checkTimeZoneId(expr.asInstanceOf[TimeZoneAwareExpression].zoneId)
}
}

// if the wrapped expression is time zone aware
private final def isTimeZoneAwareExpr: Boolean = expr.isInstanceOf[TimeZoneAwareExpression]

/**
* whether the GPU supports non UTC time zone, for each expression that supports non UTC time
* zone, should override this method to return true
*/
def supportsNonUTCTimeZone: Boolean = false

/**
* For cast expr or might other exprs, it's time zone aware, but time zone check can be skipped
* for some input/output types, like cast(int as long). For this kind of expr, should override
* this method and return true which means this Expr Meta should check time zone itself
*/
def tagTimeZoneBySelf: Boolean = false

/**
* Called to verify that this expression will work on the GPU. For most expressions without
* extra checks all of the checks should have already been done.
Expand Down
20 changes: 13 additions & 7 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ final class TypeSig private(
case FloatType => check.contains(TypeEnum.FLOAT)
case DoubleType => check.contains(TypeEnum.DOUBLE)
case DateType => check.contains(TypeEnum.DATE)
case TimestampType if check.contains(TypeEnum.TIMESTAMP) =>
TypeChecks.areTimestampsSupported()
Copy link
Collaborator Author

@res-life res-life Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally invoked by shuffle meta, FileFormatChecks, tag AST and other.

  • shuffle meta, it's safe to remove this check, because shuffle definitely support non utc timezone.
  • FileFormatChecks: Spark always write Parqeut with UTC timestamp, it's safe; For ORC, Spark map ORC type timestamp with local time zone to Spark type TIMESTAMP_NTZ (with no time zone). Now spark-rapids does not support TIMESTAMP_NTZ currently, so it's safe to remove the check. Refer to link
  • tag AST: Not sure if remove this UTC check is OK, need to investigate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case TimestampType => check.contains(TypeEnum.TIMESTAMP)
case StringType => check.contains(TypeEnum.STRING)
case dt: DecimalType =>
check.contains(TypeEnum.DECIMAL) &&
Expand Down Expand Up @@ -841,10 +840,6 @@ object TypeChecks {
areTimestampsSupported(SQLConf.get.sessionLocalTimeZone)
}

def isTimezoneSensitiveType(dataType: DataType): Boolean = {
dataType == TimestampType
}

def timezoneNotSupportedString(dataType: DataType): String = {
s"$dataType is not supported with timezone settings: (JVM:" +
s" ${ZoneId.systemDefault()}, session: ${SQLConf.get.sessionLocalTimeZone})." +
Expand Down Expand Up @@ -1502,7 +1497,18 @@ class CastChecks extends ExprChecks {

def gpuCanCast(from: DataType, to: DataType): Boolean = {
val (checks, _) = getChecksAndSigs(from)
checks.isSupportedByPlugin(to)
checks.isSupportedByPlugin(to) && gpuCanCastConsiderTimezone(from, to)
}

def gpuCanCastConsiderTimezone(from: DataType, to: DataType) = {
// remove this check after non UTC timezone is supported for cast
(from, to) match {
case (StringType, TimestampType | DateType) => TypeChecks.areTimestampsSupported()
case (TimestampType | DateType, StringType) => TypeChecks.areTimestampsSupported()
case (DateType, TimestampType) => TypeChecks.areTimestampsSupported()
case (TimestampType, DateType) => TypeChecks.areTimestampsSupported()
case _ => true
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not touched?

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ class FromUTCTimestampExprMeta(
extends BinaryExprMeta[FromUTCTimestamp](expr, conf, parent, rule) {

override def tagExprForGpu(): Unit = {
// remove this check after non-UTC timezone is supported
extractStringLit(expr.right) match {
case None =>
willNotWorkOnGpu("timezone input must be a literal string")
Expand Down
Loading