-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Move timezone check to each operator [databricks] #9482
[WIP] Move timezone check to each operator [databricks] #9482
Conversation
Signed-off-by: Chong Gao <[email protected]>
It is more than just those operators. In spark time zone aware code is controlled by Notably most of these are in datetimeExpressions.scala, but there are a number outside of it (This came from Spark 3.5.0 using IntelliJ to find all of the implementations of it).
Many of these we don't have GPU equivalents for, and in some cases we do have GPU versions. For all of the others we need to make sure that they are covered and if they don't support timestamps or don't need to worry about time zone for some reason we need to document it and preferably have a follow on issue filed to come back and implement the timestamp functionality if it is missing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are also some execs that use timezones that we might need to be careful of. Like CSV and JSON parsing.
We should make sure that we test all of our code with a different time zone, not just the handful of tests that we expect to fall back. Can we update the tests that we expect to not support a different time zone to skip themselves if the timezone is not what they expect? and for all of them to have a test that skips itself if the timezone is UTC, but verifies that we fallback to the CPU in those cases.
That should help us know exactly what we need to update to support timezones.
@@ -1733,6 +1762,11 @@ object GpuOverrides extends Logging { | |||
TypeSig.TIMESTAMP, TypeSig.TIMESTAMP), | |||
(second, conf, p, r) => new UnaryExprMeta[Second](second, conf, p, r) { | |||
|
|||
override def tagExprForGpu(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we try and have a TimeZoneAwareExprMeta, or something similar that makes it super simple to do this? We might even be able to back it into ExprMeta itself, just by checking if the class that this wraps is also TimeZoneAware
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing the best approach is to put it directly in ExprMeta since otherwise we would have to mixin the TimeZoneAwareExprMeta for the different functions. I'm guessing that functions requiring timezone will span the gamut of Unary/Binary/Ternary/Quaternary/Agg/etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe wrap the check in a method and override it whenever a function starts supporting alternate timezones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like GpuCast will be a first exception to this idea: #6835
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm testing all the existing test cases with adding non-UTC time zone config to identify all the failed cases:
def _set_all_confs(conf):
_spark.conf.set("spark.sql.session.timeZone": "+08:00")
Then I'll update the failed cases.
@@ -11,10 +11,10 @@ | |||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one line break after license header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -840,7 +839,7 @@ object TypeChecks { | |||
areTimestampsSupported(ZoneId.systemDefault()) && | |||
areTimestampsSupported(SQLConf.get.sessionLocalTimeZone) | |||
} | |||
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space.
return spark.createDataFrame(SparkContext.getOrCreate().parallelize(data),schema) | ||
|
||
# used by timezone test cases, specify all the sqls that will be impacted by non-utc timezone | ||
time_zone_sql_conf_pairs = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: There're some functions related to timezone (not supported yet), mentioned in Spark built-in function website. We can add some comments mentioning here.
convert_timezone
-- SELECT convert_timezone('Europe/Brussels', 'America/Los_Angeles', timestamp_ntz'2021-12-06 00:00:00');
current_timezone()
make_timestamp()
make_timestamp_ltz()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For current_timezone, it just returns the session timezone, we can ignore it for this PR.
Spark config "spark.sql.session.timeZone" can set this value.
For MakeTimestamp and ConvertTimezone, it's recorded in this follow on issue: #9570
pom.xml
Outdated
@@ -1045,7 +1045,7 @@ | |||
<arg>-Yno-adapted-args</arg> | |||
<arg>-Ywarn-unused:imports,locals,patvars,privates</arg> | |||
<arg>-Xlint:missing-interpolator</arg> | |||
<arg>-Xfatal-warnings</arg> | |||
<!-- <arg>-Xfatal-warnings</arg> --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Revert this back when we try to commit it.
@@ -374,13 +374,12 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { | |||
case Some(value) => ZoneId.of(value) | |||
case None => throw new RuntimeException(s"Driver time zone cannot be determined.") | |||
} | |||
if (TypeChecks.areTimestampsSupported(driverTimezone)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may off-topic. Considering the configuration spark.sql.session.timeZone
, should both driver and executor respect it? Then do we still need the check on driver and executor's timezone mismatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering the configuration spark.sql.session.timeZone, should both driver and executor respect it?
Here driverTimezone
is from driver ZoneId.systemDefault()
, not from spark.sql.session.timeZone
, refer to: PR
Spark itself does not have this kind of check.
But for our spark-rapids
, we check executor and driver have the same JVM time zone.
Then do we still need the check on driver and executor's timezone mismatch?
I think yes, becasue we want to avoid the issue
case TimestampType => | ||
TypeChecks.areTimestampsSupported(ZoneId.systemDefault()) && | ||
TypeChecks.areTimestampsSupported(SQLConf.get.sessionLocalTimeZone) | ||
case TimestampType => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to consider the timezone check for scan and writer parts? AFAIK, when scanning data from Parquet, spark.sql.session.timeZone
is supposed to be respect.
If applies, we should add some python tests as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is used by InternalColumnarRddConverter
and HostToGpuCoalesceIterator
.
Coalesce
can handle non UTC timestamp. But not sure InternalColumnarRddConverter
, seems it's also OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will need to check these. For me, anything that does not have a test that shows it works fully in at least one other time zone must fall back to the CPU if it sees a timestamp that is not UTC.
Parquet for example has the rebase mode for older timestamps that requires knowing the timezone to do properly.
build |
@@ -363,8 +363,7 @@ final class TypeSig private( | |||
case FloatType => check.contains(TypeEnum.FLOAT) | |||
case DoubleType => check.contains(TypeEnum.DOUBLE) | |||
case DateType => check.contains(TypeEnum.DATE) | |||
case TimestampType if check.contains(TypeEnum.TIMESTAMP) => | |||
TypeChecks.areTimestampsSupported() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally invoked by shuffle meta, FileFormatChecks, tag AST and other.
- shuffle meta, it's safe to remove this check, because shuffle definitely support non utc timezone.
- FileFormatChecks: Spark always write Parqeut with UTC timestamp, it's safe; For ORC, Spark map ORC type
timestamp with local time zone
to Spark type TIMESTAMP_NTZ (with no time zone). Now spark-rapids does not support TIMESTAMP_NTZ currently, so it's safe to remove the check. Refer to link - tag AST: Not sure if remove this UTC check is OK, need to investigate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick look at cudf. For AST, I noticed timezone info is not respected yet.
Still WIP, need to check more exprs. |
Filed a sub issue: #9570 |
@pytest.mark.parametrize('sql, extra_conf', time_zone_sql_conf_pairs) | ||
def test_timezone_for_operators_with_non_utc(sql, extra_conf): | ||
# timezone is non-utc, should fallback to CPU | ||
timezone_conf = {"spark.sql.session.timeZone": "+08:00", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the time zone string a param to the test? Just because I would like to test a few more time zones than just +08:00
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
case TimestampType => | ||
TypeChecks.areTimestampsSupported(ZoneId.systemDefault()) && | ||
TypeChecks.areTimestampsSupported(SQLConf.get.sessionLocalTimeZone) | ||
case TimestampType => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will need to check these. For me, anything that does not have a test that shows it works fully in at least one other time zone must fall back to the CPU if it sees a timestamp that is not UTC.
Parquet for example has the rebase mode for older timestamps that requires knowing the timezone to do properly.
Added a pytest mark |
build |
Here is the checking of UTC for rebase mode for older timestamps. SparkShimImpl.parquetRebaseWrite(sqlConf) match {
case "EXCEPTION" | "CORRECTED" => // Good
case "LEGACY" =>
if (!TypeChecks.areTimestampsSupported()) {
meta.willNotWorkOnGpu("Only UTC timezone is supported in LEGACY rebase mode. " +
s"Current timezone settings: (JVM : ${ZoneId.systemDefault()}, " +
s"session: ${SQLConf.get.sessionLocalTimeZone}). " +
" Set both of the timezones to UTC to enable LEGACY rebase support.")
} I think it's safe to remove the following in case TimestampType =>
TypeChecks.areTimestampsSupported(ZoneId.systemDefault()) &&
TypeChecks.areTimestampsSupported(SQLConf.get.sessionLocalTimeZone)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I would like to see the following either in this PR or in a follow on PR.
- how the time zone testing is inserted into the CI runs so we can continue to verify that it works as expected.
- if there is a way for us to test more time zones than just UTC and Asia/Shanghai. Perhaps we can randomly select one from a list of them based off of a random seed that can be passed into the tests. But this should probably be a follow on issue.
- For all of the tests that are skipped because of the time zone issue an automated test that verifies that we fell back to the CPU for them instead of producing the wrong answer.
Number 3 I see as a blocker for us to ship a product with this patch in it. We cannot ship something with data corruption in it. I am fine with checking this in for now so long as we have manually verified that the tests failed because we fell back to the CPU. But I want automated tests ASAP.
@@ -354,7 +359,9 @@ def get_sequence_data(gen, len): | |||
SparkContext.getOrCreate().parallelize(get_sequence_data(data_gen, length)), | |||
mixed_schema) | |||
|
|||
@disable_timezone_test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why two of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
build |
Talked to Peixin, CI pass a Env variable contains the timezone , then I'll update the cases use this Env variable to set object ResolveTimeZone extends Rule[LogicalPlan] {
private val transformTimeZoneExprs: PartialFunction[Expression, Expression] = {
case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
e.withTimeZone(conf.sessionLocalTimeZone)
Talked to Peixin, we'd better not use random timezone, random timezone will cause confusion.
|
About automate the tests to verify fall back to CPU. @disable_timezone_test
@pytest.mark.parametrize('start_gen,stop_gen', sequence_normal_no_step_integral_gens, ids=idfn)
def test_sequence_without_step(start_gen, stop_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, start_gen, stop_gen).selectExpr(
"sequence(a, b)",
"sequence(a, 20)",
"sequence(20, b)")) It's hard to automately generate a case like: @pytest.mark.parametrize('start_gen,stop_gen', sequence_normal_no_step_integral_gens, ids=idfn)
def test_sequence_without_step(start_gen, stop_gen):
assert_gpu_fallback_collect(
lambda spark: two_col_df(spark, start_gen, stop_gen).selectExpr(
"sequence(a, b)",
"sequence(a, 20)",
"sequence(20, b)"),
cpu_fallback_class_name = "Sequence" # We should analyze the code and get the operator is "sequence"
) And after we finished the timezone feature, this case should be removed. |
Some tests failed on Databricks:
More info:
|
If I set rebase mode to "CORRECTED", then
Note sure why Databricks has different behavior. # set the int96 rebase mode values because its LEGACY in databricks which will preclude this op from running on GPU
'spark.sql.legacy.parquet.int96RebaseModeInWrite' : 'CORRECTED',
'spark.sql.legacy.parquet.int96RebaseModeInRead' : 'CORRECTED'} Do not know why some test code should set extra config to make test cases pass. |
This is the reason:
So you need to set your Spark session timezone to UTC too. The On Databricks, the default mode is |
Signed-off-by: Firestarman <[email protected]>
…tests Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@@ -204,25 +205,66 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena | |||
.json(data_path), | |||
conf=updated_conf) | |||
|
|||
@allow_non_gpu('FileSourceScanExec', 'ProjectExec') | |||
@allow_non_gpu('FileSourceScanExec', 'BatchScanExec') | |||
@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be xfail
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are testing fallback logic for non-UTC TZ. And in future will remove this.
For utc TZ, just skip is OK.
Changes about checker:
Other changes:
|
After this commit, more failed cases occur:
|
My paln:
if (TypeChecks.areTimestampsSupported(driverTimezone)) {
val executorTimezone = ZoneId.systemDefault()
if (executorTimezone.normalized() != driverTimezone.normalized()) {
throw new RuntimeException(s" Driver and executor timezone mismatch. " +
s"Driver timezone is $driverTimezone and executor timezone is " +
s"$executorTimezone. Set executor timezone to $driverTimezone.")
}
}
I'll update this PR to xfail all the failed cases. And please review these first: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is kind of too much going on here. We are changing the DB tests to be consistent with the non-DB tests, but I don't have confidence that we did it right. We are splitting up the timezone checks in the plugin, and we are changing all of the tests to support testing in different time zones. Can we split this up into at least 3 separate PRs. Doing it all at once is too much for me to really follow.
@@ -158,6 +229,22 @@ def test_cast_string_ts_valid_format(data_gen): | |||
conf = {'spark.rapids.sql.hasExtendedYearValues': 'false', | |||
'spark.rapids.sql.castStringToTimestamp.enabled': 'true'}) | |||
|
|||
@allow_non_gpu('ProjectExec') | |||
@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC tz for Cast from StringType to TimeStampType") | |||
@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the fallback tests we don't need to do all of the combinations. We just need one to be sure that we are falling back.
@allow_non_gpu('ProjectExec') | ||
@pytest.mark.skipif(is_before_spark_320(), reason="Spark versions(< 320) not support Ansi mode when casting string to date") | ||
@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC tz for Cast from StringType to DateType") | ||
def test_cast_string_date_valid_ansi_for_non_utc(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'm not sure that we need ANSI fallback tests for cast. The fallback tests can be far less than the validate we do the right thing tests. We just need to cover the operator/types when we expect a fallback to happen.
@@ -294,14 +381,37 @@ def _assert_cast_to_string_equal (data_gen, conf): | |||
conf | |||
) | |||
|
|||
# split all_array_gens_for_cast_to_string | |||
# remove below split and merge tests: "TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC tz for Cast from Date/Timestamp to String" | |||
gens_for_non_utc_strs = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Again here I think we really only need Array(date) and Array(Timestamp). The rest are nice, but not required.
# currently does not support. On Databricks, the default datetime rebase mode is LEGACY, | ||
# it's different from regular Spark. Some of the cases will fall if timezone is non-UTC on DB. | ||
# The following configs is for DB and ensure the rebase mode is not LEGACY on DB. | ||
writer_confs_for_DB = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't want this in here. I am fine if we xfail the DB write tests and point to why when the timezone is not UTC. But I don't want to change what we are testing unless we go through each test and verify that we are not losing coverage. This PR is already big enough. If you want to do this change it should be split off into another PR.
@@ -1728,11 +1726,11 @@ object GpuOverrides extends Logging { | |||
GpuMinute(expr) | |||
}), | |||
expr[Second]( | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not needed.
@@ -363,8 +363,7 @@ final class TypeSig private( | |||
case FloatType => check.contains(TypeEnum.FLOAT) | |||
case DoubleType => check.contains(TypeEnum.DOUBLE) | |||
case DateType => check.contains(TypeEnum.DATE) | |||
case TimestampType if check.contains(TypeEnum.TIMESTAMP) => | |||
TypeChecks.areTimestampsSupported() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick look at cudf. For AST, I noticed timezone info is not respected yet.
@@ -1,5 +1,5 @@ | |||
/* | |||
* Copyright (c) 2021-2022, NVIDIA CORPORATION. | |||
* Copyright (c) 2021-2023, NVIDIA CORPORATION. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not touched?
For 1 and 2 here, I'm filed #9737 to track this separately. I think this will be important. The issue is that we need to really ensure that there is no data corruption when merging many of the changes here. |
Replace this with #9719 |
closes #6832
Changes:
datetimeExpressions.scala
, and add check to operators:TimeZoneAwareExpression
, but it takes a timezone parameter.Signed-off-by: Chong Gao [email protected]