Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move timezone check to each operator [databricks] #9482

Closed

Conversation

res-life
Copy link
Collaborator

@res-life res-life commented Oct 19, 2023

closes #6832

Changes:

  1. Removed generic check
  2. Checked all the operators in datetimeExpressions.scala, and add check to operators:
  • GpuMinute
  • GpuSecond
  • GpuHour
  • DateAddInterval GpuDateAddInterval
  • GpuDateFormatClass
  • GpuToTimestamp abstract
  • GpuToTimestampImproved abstract
  • GpuUnixTimestamp
  • GpuToUnixTimestamp
  • GpuToUnixTimestampImproved
  • GpuGetTimestamp
  • GpuFromUnixTime
  • GpuFromUTCTimestamp Note: this is not a TimeZoneAwareExpression, but it takes a timezone parameter.
  • GpuSequence
  • Cast
  1. Add test cases to verify operators timezone check

Signed-off-by: Chong Gao [email protected]

@revans2
Copy link
Collaborator

revans2 commented Oct 19, 2023

It is more than just those operators. In spark time zone aware code is controlled by TimeZoneAwareExpression

Notably most of these are in datetimeExpressions.scala, but there are a number outside of it (This came from Spark 3.5.0 using IntelliJ to find all of the implementations of it).

  • Cast
  • CsvToStructs
  • CurrentBatchTimestamp
  • CurrentDate
  • DateAddInterval
  • DateFormatClass
  • FromUnixTime
  • GetTimeField
  • GetTimestamp
  • Hour
  • JsonToStructs
  • LocalTimestamp
  • MakeTimestamp
  • Minute
  • MonthsBetween
  • ParseToDate
  • ParseToTimestamp
  • Second
  • SecondWithFraction
  • Sequence
  • StructsToCsv
  • StructsToJson
  • SubtractTimestamps
  • TimeAdd
  • TimestampAdd
  • TimestampAddYMInterval
  • TimestampDiff
  • TimestampFormatterHelper
  • ToPrettyString
  • ToTimestamp
  • ToUnixTimestamp
  • TruncTimestamp
  • UnixTime
  • UnixTimestamp

Many of these we don't have GPU equivalents for, and in some cases we do have GPU versions. For all of the others we need to make sure that they are covered and if they don't support timestamps or don't need to worry about time zone for some reason we need to document it and preferably have a follow on issue filed to come back and implement the timestamp functionality if it is missing.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also some execs that use timezones that we might need to be careful of. Like CSV and JSON parsing.

We should make sure that we test all of our code with a different time zone, not just the handful of tests that we expect to fall back. Can we update the tests that we expect to not support a different time zone to skip themselves if the timezone is not what they expect? and for all of them to have a test that skips itself if the timezone is UTC, but verifies that we fallback to the CPU in those cases.

That should help us know exactly what we need to update to support timezones.

@@ -1733,6 +1762,11 @@ object GpuOverrides extends Logging {
TypeSig.TIMESTAMP, TypeSig.TIMESTAMP),
(second, conf, p, r) => new UnaryExprMeta[Second](second, conf, p, r) {

override def tagExprForGpu(): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try and have a TimeZoneAwareExprMeta, or something similar that makes it super simple to do this? We might even be able to back it into ExprMeta itself, just by checking if the class that this wraps is also TimeZoneAware.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the best approach is to put it directly in ExprMeta since otherwise we would have to mixin the TimeZoneAwareExprMeta for the different functions. I'm guessing that functions requiring timezone will span the gamut of Unary/Binary/Ternary/Quaternary/Agg/etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe wrap the check in a method and override it whenever a function starts supporting alternate timezones.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like GpuCast will be a first exception to this idea: #6835

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm testing all the existing test cases with adding non-UTC time zone config to identify all the failed cases:

https://github.com/NVIDIA/spark-rapids/blob/v23.10.0/integration_tests/src/main/python/spark_session.py#L68-L74

def _set_all_confs(conf):
    _spark.conf.set("spark.sql.session.timeZone": "+08:00")

Then I'll update the failed cases.

@sameerz sameerz added the task Work required that improves the product but is not user facing label Oct 19, 2023
@@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one line break after license header.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -840,7 +839,7 @@ object TypeChecks {
areTimestampsSupported(ZoneId.systemDefault()) &&
areTimestampsSupported(SQLConf.get.sessionLocalTimeZone)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space.

return spark.createDataFrame(SparkContext.getOrCreate().parallelize(data),schema)

# used by timezone test cases, specify all the sqls that will be impacted by non-utc timezone
time_zone_sql_conf_pairs = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There're some functions related to timezone (not supported yet), mentioned in Spark built-in function website. We can add some comments mentioning here.

convert_timezone
-- SELECT convert_timezone('Europe/Brussels', 'America/Los_Angeles', timestamp_ntz'2021-12-06 00:00:00');
current_timezone()
make_timestamp()
make_timestamp_ltz()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For current_timezone, it just returns the session timezone, we can ignore it for this PR.
Spark config "spark.sql.session.timeZone" can set this value.

For MakeTimestamp and ConvertTimezone, it's recorded in this follow on issue: #9570

pom.xml Outdated
@@ -1045,7 +1045,7 @@
<arg>-Yno-adapted-args</arg>
<arg>-Ywarn-unused:imports,locals,patvars,privates</arg>
<arg>-Xlint:missing-interpolator</arg>
<arg>-Xfatal-warnings</arg>
<!-- <arg>-Xfatal-warnings</arg> -->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Revert this back when we try to commit it.

@@ -374,13 +374,12 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
case Some(value) => ZoneId.of(value)
case None => throw new RuntimeException(s"Driver time zone cannot be determined.")
}
if (TypeChecks.areTimestampsSupported(driverTimezone)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may off-topic. Considering the configuration spark.sql.session.timeZone, should both driver and executor respect it? Then do we still need the check on driver and executor's timezone mismatch?

Copy link
Collaborator Author

@res-life res-life Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the configuration spark.sql.session.timeZone, should both driver and executor respect it?

Here driverTimezone is from driver ZoneId.systemDefault(), not from spark.sql.session.timeZone, refer to: PR
Spark itself does not have this kind of check.

But for our spark-rapids, we check executor and driver have the same JVM time zone.

Then do we still need the check on driver and executor's timezone mismatch?

I think yes, becasue we want to avoid the issue

case TimestampType =>
TypeChecks.areTimestampsSupported(ZoneId.systemDefault()) &&
TypeChecks.areTimestampsSupported(SQLConf.get.sessionLocalTimeZone)
case TimestampType => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider the timezone check for scan and writer parts? AFAIK, when scanning data from Parquet, spark.sql.session.timeZone is supposed to be respect.

If applies, we should add some python tests as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is used by InternalColumnarRddConverter and HostToGpuCoalesceIterator.
Coalesce can handle non UTC timestamp. But not sure InternalColumnarRddConverter, seems it's also OK.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to check these. For me, anything that does not have a test that shows it works fully in at least one other time zone must fall back to the CPU if it sees a timestamp that is not UTC.

Parquet for example has the rebase mode for older timestamps that requires knowing the timezone to do properly.

@res-life
Copy link
Collaborator Author

build

@@ -363,8 +363,7 @@ final class TypeSig private(
case FloatType => check.contains(TypeEnum.FLOAT)
case DoubleType => check.contains(TypeEnum.DOUBLE)
case DateType => check.contains(TypeEnum.DATE)
case TimestampType if check.contains(TypeEnum.TIMESTAMP) =>
TypeChecks.areTimestampsSupported()
Copy link
Collaborator Author

@res-life res-life Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally invoked by shuffle meta, FileFormatChecks, tag AST and other.

  • shuffle meta, it's safe to remove this check, because shuffle definitely support non utc timezone.
  • FileFormatChecks: Spark always write Parqeut with UTC timestamp, it's safe; For ORC, Spark map ORC type timestamp with local time zone to Spark type TIMESTAMP_NTZ (with no time zone). Now spark-rapids does not support TIMESTAMP_NTZ currently, so it's safe to remove the check. Refer to link
  • tag AST: Not sure if remove this UTC check is OK, need to investigate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@res-life
Copy link
Collaborator Author

Still WIP, need to check more exprs.

@res-life
Copy link
Collaborator Author

Filed a sub issue: #9570

@pytest.mark.parametrize('sql, extra_conf', time_zone_sql_conf_pairs)
def test_timezone_for_operators_with_non_utc(sql, extra_conf):
# timezone is non-utc, should fallback to CPU
timezone_conf = {"spark.sql.session.timeZone": "+08:00",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the time zone string a param to the test? Just because I would like to test a few more time zones than just +08:00

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case TimestampType =>
TypeChecks.areTimestampsSupported(ZoneId.systemDefault()) &&
TypeChecks.areTimestampsSupported(SQLConf.get.sessionLocalTimeZone)
case TimestampType => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to check these. For me, anything that does not have a test that shows it works fully in at least one other time zone must fall back to the CPU if it sees a timestamp that is not UTC.

Parquet for example has the rebase mode for older timestamps that requires knowing the timezone to do properly.

@res-life
Copy link
Collaborator Author

Added a pytest mark disable_timezone_test which means skip non UTC time zone tests.
By default, all the cases without disable_timezone_test will using non UTC timezone.
For the time zone aware test cases, added the disable_timezone_test.

@res-life
Copy link
Collaborator Author

build

@res-life
Copy link
Collaborator Author

I think we will need to check these. For me, anything that does not have a test that shows it works fully in at least one other time zone must fall back to the CPU if it sees a timestamp that is not UTC.
Parquet for example has the rebase mode for older timestamps that requires knowing the timezone to do properly.

Here is the checking of UTC for rebase mode for older timestamps.

    SparkShimImpl.parquetRebaseWrite(sqlConf) match {
      case "EXCEPTION" | "CORRECTED" => // Good
      case "LEGACY" =>
        if (!TypeChecks.areTimestampsSupported()) {
          meta.willNotWorkOnGpu("Only UTC timezone is supported in LEGACY rebase mode. " +
            s"Current timezone settings: (JVM : ${ZoneId.systemDefault()}, " +
            s"session: ${SQLConf.get.sessionLocalTimeZone}). " +
            " Set both of the timezones to UTC to enable LEGACY rebase support.")
        }

I think it's safe to remove the following in GpuOverrides.scala

      case TimestampType =>
        TypeChecks.areTimestampsSupported(ZoneId.systemDefault()) &&
        TypeChecks.areTimestampsSupported(SQLConf.get.sessionLocalTimeZone)

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I would like to see the following either in this PR or in a follow on PR.

  1. how the time zone testing is inserted into the CI runs so we can continue to verify that it works as expected.
  2. if there is a way for us to test more time zones than just UTC and Asia/Shanghai. Perhaps we can randomly select one from a list of them based off of a random seed that can be passed into the tests. But this should probably be a follow on issue.
  3. For all of the tests that are skipped because of the time zone issue an automated test that verifies that we fell back to the CPU for them instead of producing the wrong answer.

Number 3 I see as a blocker for us to ship a product with this patch in it. We cannot ship something with data corruption in it. I am fine with checking this in for now so long as we have manually verified that the tests failed because we fell back to the CPU. But I want automated tests ASAP.

@@ -354,7 +359,9 @@ def get_sequence_data(gen, len):
SparkContext.getOrCreate().parallelize(get_sequence_data(data_gen, length)),
mixed_schema)

@disable_timezone_test
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why two of these?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@res-life res-life changed the title [WIP] Move timezone check to each operator [WIP] Move timezone check to each operator [databricks] Nov 1, 2023
@res-life
Copy link
Collaborator Author

res-life commented Nov 1, 2023

build

@res-life
Copy link
Collaborator Author

res-life commented Nov 1, 2023

how the time zone testing is inserted into the CI runs so we can continue to verify that it works as expected.

Talked to Peixin, CI pass a Env variable contains the timezone , then I'll update the cases use this Env variable to set spark.sql.session.timeZone. For timezone aware expressions, it always use spark.sql.session.timeZone first.
Refer to link

object ResolveTimeZone extends Rule[LogicalPlan] {
  private val transformTimeZoneExprs: PartialFunction[Expression, Expression] = {
    case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty =>
      e.withTimeZone(conf.sessionLocalTimeZone)

if there is a way for us to test more time zones than just UTC and Asia/Shanghai. Perhaps we can randomly select one from a list of them based off of a random seed that can be passed into the tests. But this should probably be a follow on issue.

Talked to Peixin, we'd better not use random timezone, random timezone will cause confusion.
We can enable non UTC test on some specific Spark version, like Spark31x, Spark 35x, not all the Spark versions considering the GPU resource limit and time limit.
Like adding the following scenario:

  • Asia/Shanghai on Spark31x, Spark 35x
  • PST on Spark31x, Spark 35x

@res-life
Copy link
Collaborator Author

res-life commented Nov 1, 2023

For all of the tests that are skipped because of the time zone issue an automated test that verifies that we fell back to the CPU for them instead of producing the wrong answer.
Number 3 I see as a blocker for us to ship a product with this patch in it. We cannot ship something with data corruption in it. I am fine with checking this in for now so long as we have manually verified that the tests failed because we fell back to the CPU. But I want automated tests ASAP.

About automate the tests to verify fall back to CPU.
For example:

@disable_timezone_test
@pytest.mark.parametrize('start_gen,stop_gen', sequence_normal_no_step_integral_gens, ids=idfn)
def test_sequence_without_step(start_gen, stop_gen):
    assert_gpu_and_cpu_are_equal_collect(
        lambda spark: two_col_df(spark, start_gen, stop_gen).selectExpr(
            "sequence(a, b)",
            "sequence(a, 20)",
            "sequence(20, b)"))

It's hard to automately generate a case like:

@pytest.mark.parametrize('start_gen,stop_gen', sequence_normal_no_step_integral_gens, ids=idfn)
def test_sequence_without_step(start_gen, stop_gen):
    assert_gpu_fallback_collect(
        lambda spark: two_col_df(spark, start_gen, stop_gen).selectExpr(
            "sequence(a, b)",
            "sequence(a, 20)",
            "sequence(20, b)"),
      cpu_fallback_class_name = "Sequence" # We should analyze the code and get the operator is "sequence"
)

And after we finished the timezone feature, this case should be removed.
I think we can currently manually verify them.

@res-life
Copy link
Collaborator Author

res-life commented Nov 1, 2023

Some tests failed on Databricks:

=========================== short test summary info ============================
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/hive_write_test.py::test_hive_copy_ints_to_long - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/hive_write_test.py::test_hive_copy_longs_to_float - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(StringType,[None])][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(StringType,[None, ''])][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(not_null)(StringType,[''])][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(ArrayType(StringType,true),[None])] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(not_null)(ArrayType(StringType,true),[[]])][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(not_null)(ArrayType(StringType,true),[['', '']])][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(ArrayType(StringType,true),[None, [], [None], [''], [None, '']])] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.482Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(MapType(StringType,StringType,true),[{}, None, {'A': ''}, {'B': None}])] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(MapType(StringType,StringType,true),[None])] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_round_trip_corner[SetValues(not_null)(MapType(StringType,StringType,true),[{}])][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_compress_write_round_trip[none] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_compress_write_round_trip[uncompressed][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_compress_write_round_trip[snappy] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_compress_write_round_trip[zstd][INJECT_OOM] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_ts_write_twice_fails_exception - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_map_nullable - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_concurrent_writer[INJECT_OOM, IGNORE_ORDER] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_dynamic_partitioned_parquet_write[INJECT_OOM, IGNORE_ORDER({'local': True})] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_write_list_struct_single_element[INJECT_OOM, IGNORE_ORDER, ALLOW_NON_GPU(SortExec,ShuffleExchangeExec)] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] FAILED ../../src/main/python/parquet_write_test.py::test_parquet_write_column_name_with_dots[INJECT_OOM, IGNORE_ORDER] - pyspark.sql.utils.IllegalArgumentException: Part of the plan is not columna...
12:46:55  [2023-11-01T04:46:11.483Z] = 22 failed, 17231 passed, 3676 skipped, 846 xfailed, 286 xpassed, 112 warnings in 7429.48s (2:03:49) =

More info:

23/11/01 14:08:59 WARN GpuOverrides: 
!Exec <DataWritingCommandExec> cannot run on GPU because not all data writing commands can be replaced
  !Output <InsertIntoHadoopFsRelationCommand> cannot run on GPU because Only UTC timezone is supported in LEGACY rebase mode. Current timezone settings: (JVM : UTC, session: Asia/Shanghai).  Set both of the timezones to UTC to enable LEGACY rebase support.
  ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
    @Expression <AttributeReference> _c0#63 could run on GPU

@res-life
Copy link
Collaborator Author

res-life commented Nov 1, 2023

If I set rebase mode to "CORRECTED", then test_write_round_trip_corner case passed on Databricks.

+                 'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
+                 'spark.sql.parquet.datetimeRebaseModeInRead': 'CORRECTED',
+                 'spark.sql.parquet.int96RebaseModeInWrite' : 'CORRECTED',
+                 'spark.sql.parquet.int96RebaseModeInRead' : 'CORRECTED'

Note sure why Databricks has different behavior.
I saw this code, link:

                 # set the int96 rebase mode values because its LEGACY in databricks which will preclude this op from running on GPU
                 'spark.sql.legacy.parquet.int96RebaseModeInWrite' : 'CORRECTED',
                 'spark.sql.legacy.parquet.int96RebaseModeInRead' : 'CORRECTED'}

Do not know why some test code should set extra config to make test cases pass.
@ttnghia , can you recall this?

@ttnghia
Copy link
Collaborator

ttnghia commented Nov 1, 2023

This is the reason:

  !Output <InsertIntoHadoopFsRelationCommand> cannot run on GPU because Only UTC timezone
 is supported in LEGACY rebase mode.
Current timezone settings: (JVM : UTC, session: Asia/Shanghai).

So you need to set your Spark session timezone to UTC too.

The CORRECTED option does not require UTC timezone, while LEGACY would need it. So if you run parquet test with LEGACY date time rebase mode, you must be in UTC.

On Databricks, the default mode is LEGACY thus you need to explicitly set it to CORRECTED if you are not doing testing for parquet datetime rebase.

@@ -204,25 +205,66 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
.json(data_path),
conf=updated_conf)

@allow_non_gpu('FileSourceScanExec', 'ProjectExec')
@allow_non_gpu('FileSourceScanExec', 'BatchScanExec')
@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be xfail?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are testing fallback logic for non-UTC TZ. And in future will remove this.
For utc TZ, just skip is OK.

@res-life
Copy link
Collaborator Author

res-life commented Nov 15, 2023

Changes about checker:

  • bottom check, add config nonUtc.enabled. If disable non UTC TZ support, check UTC TZ as originally does.
  • Expression check: if it's time zone aware expr, check UTC TZ; Cast: check conversion between string,date,timestamp.
  • Exec check: skip the UTC check for all Execs. Because Expression will check, Exec no need to check.
  • For the fieformats: remove check for Parquet, remain for other file types.

Other changes:

  • Remove the skip test cases logic in data_gen.py if it's not UTC.
  • For databricks, set the rebase mode becasue DB's default rebase mode is different with regular Sparks.
  • For fallback cases:
   e.g.:
   xfail(is_non_utc, reason = 'Will update in future for non utc, so here use xfail to remind us')
   test_op()
       assert_equals
  
   skipif(is_utc, reason = 'skip it, becasue will remove this case in future. This is used to test fall back when it's non UTC')
   test_op_for_non_utc()
        assert_fall_back

@res-life
Copy link
Collaborator Author

After this commit, more failed cases occur:

FAILED ../../src/main/python/arithmetic_ops_test.py::test_greatest[Timestamp][INJECT_OOM]
FAILED ../../src/main/python/array_test.py::test_array_item[Short-Array(Timestamp)]
FAILED ../../src/main/python/array_test.py::test_array_item[Long-Array(Timestamp)][INJECT_OOM]
FAILED ../../src/main/python/array_test.py::test_array_item[Byte-Array(Timestamp)]
FAILED ../../src/main/python/array_test.py::test_array_item[Integer-Array(Timestamp)][INJECT_OOM]
FAILED ../../src/main/python/array_test.py::test_array_item_lit_ordinal[Array(Timestamp)]
FAILED ../../src/main/python/array_test.py::test_orderby_array_unique[Array(Timestamp)]
FAILED ../../src/main/python/array_test.py::test_array_element_at[Array(Timestamp)][INJECT_OOM]
FAILED ../../src/main/python/array_test.py::test_array_transform[Array(Timestamp)][INJECT_OOM]
FAILED ../../src/main/python/array_test.py::test_array_repeat_with_count_column[Map(Timestamp(not_null),Timestamp)]
FAILED ../../src/main/python/array_test.py::test_array_repeat_with_count_column[Timestamp][INJECT_OOM]
FAILED ../../src/main/python/array_test.py::test_array_repeat_with_count_scalar[Array(Timestamp)][INJECT_OOM]
FAILED ../../src/main/python/array_test.py::test_array_repeat_with_count_scalar[Struct(['child0', Byte],['child1', Short],['child2', Integer],['child3', Long],['child4', Float],['child5', Double],['child6', String],['child7', Boolean],['child8', Date],['child9', Timestamp],['child10', Null])][INJECT_OOM]

@res-life
Copy link
Collaborator Author

res-life commented Nov 15, 2023

My paln:

  1. Merge this Add time zone config to set non-UTC [databricks] #9652 to enable non-UTC CI. In this PR we remove the skipping tests when time zone is non-UTC, refer to the changes in data_gen.py
  2. Run non-UTC CI, XFail all the filed cases. I think the failed cases will be the cases that contain timestamp gen. In this step we should remove the global check when Spark-Rapids starts: link
      if (TypeChecks.areTimestampsSupported(driverTimezone)) {
        val executorTimezone = ZoneId.systemDefault()
        if (executorTimezone.normalized() != driverTimezone.normalized()) {
          throw new RuntimeException(s" Driver and executor timezone mismatch. " +
              s"Driver timezone is $driverTimezone and executor timezone is " +
              s"$executorTimezone. Set executor timezone to $driverTimezone.")
        }
      }

I'll update this PR to xfail all the failed cases.
3. Update checker(Expr, Exec) to add an option to skip UTC check.
4. Implement a non-UTC supported GPU expr, like GpuDateFormatClass, use the option in 3 to enable non-UTC. Add case to test GPU operator. In this step, we will also update some common expr/exec like AttributeReference, ShuffleExec, ProjectExec, FileterExec ......
5. Parallelly update the xfail case. When we support non-UTC for a new operator, remove the xfail. For other xfail cases, we should investigate or write fallback cases.

And please review these first:
#9652 This is for step 1
#9721 This is for step 3,4

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is kind of too much going on here. We are changing the DB tests to be consistent with the non-DB tests, but I don't have confidence that we did it right. We are splitting up the timezone checks in the plugin, and we are changing all of the tests to support testing in different time zones. Can we split this up into at least 3 separate PRs. Doing it all at once is too much for me to really follow.

@@ -158,6 +229,22 @@ def test_cast_string_ts_valid_format(data_gen):
conf = {'spark.rapids.sql.hasExtendedYearValues': 'false',
'spark.rapids.sql.castStringToTimestamp.enabled': 'true'})

@allow_non_gpu('ProjectExec')
@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC tz for Cast from StringType to TimeStampType")
@pytest.mark.parametrize('data_gen', [StringGen('[0-9]{1,4}-[0-9]{1,2}-[0-9]{1,2}'),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the fallback tests we don't need to do all of the combinations. We just need one to be sure that we are falling back.

@allow_non_gpu('ProjectExec')
@pytest.mark.skipif(is_before_spark_320(), reason="Spark versions(< 320) not support Ansi mode when casting string to date")
@pytest.mark.skipif(is_utc(), reason="TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC tz for Cast from StringType to DateType")
def test_cast_string_date_valid_ansi_for_non_utc():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm not sure that we need ANSI fallback tests for cast. The fallback tests can be far less than the validate we do the right thing tests. We just need to cover the operator/types when we expect a fallback to happen.

@@ -294,14 +381,37 @@ def _assert_cast_to_string_equal (data_gen, conf):
conf
)

# split all_array_gens_for_cast_to_string
# remove below split and merge tests: "TODO sub-issue in https://github.com/NVIDIA/spark-rapids/issues/9653 to support non-UTC tz for Cast from Date/Timestamp to String"
gens_for_non_utc_strs = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Again here I think we really only need Array(date) and Array(Timestamp). The rest are nice, but not required.

# currently does not support. On Databricks, the default datetime rebase mode is LEGACY,
# it's different from regular Spark. Some of the cases will fall if timezone is non-UTC on DB.
# The following configs is for DB and ensure the rebase mode is not LEGACY on DB.
writer_confs_for_DB = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't want this in here. I am fine if we xfail the DB write tests and point to why when the timezone is not UTC. But I don't want to change what we are testing unless we go through each test and verify that we are not losing coverage. This PR is already big enough. If you want to do this change it should be split off into another PR.

@@ -1728,11 +1726,11 @@ object GpuOverrides extends Logging {
GpuMinute(expr)
}),
expr[Second](

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not needed.

@@ -363,8 +363,7 @@ final class TypeSig private(
case FloatType => check.contains(TypeEnum.FLOAT)
case DoubleType => check.contains(TypeEnum.DOUBLE)
case DateType => check.contains(TypeEnum.DATE)
case TimestampType if check.contains(TypeEnum.TIMESTAMP) =>
TypeChecks.areTimestampsSupported()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not touched?

@NVnavkumar
Copy link
Collaborator

My paln:

1. Merge this [Add time zone config to set non-UTC [databricks] #9652](https://github.com/NVIDIA/spark-rapids/pull/9652) to enable non-UTC CI. In this PR we remove the skipping tests when time zone is non-UTC, refer to the changes in `data_gen.py`

2. Run non-UTC CI,  XFail all the filed cases. I think the failed cases will be the cases that contain timestamp gen. In this step we should remove the global check when Spark-Rapids starts: [link](https://github.com/NVIDIA/spark-rapids/blob/v23.10.0/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala#L377)
      if (TypeChecks.areTimestampsSupported(driverTimezone)) {
        val executorTimezone = ZoneId.systemDefault()
        if (executorTimezone.normalized() != driverTimezone.normalized()) {
          throw new RuntimeException(s" Driver and executor timezone mismatch. " +
              s"Driver timezone is $driverTimezone and executor timezone is " +
              s"$executorTimezone. Set executor timezone to $driverTimezone.")
        }
      }

I'll update this PR to xfail all the failed cases. 3. Update checker(Expr, Exec) to add an option to skip UTC check. 4. Implement a non-UTC supported GPU expr, like GpuDateFormatClass, use the option in 3 to enable non-UTC. Add case to test GPU operator. In this step, we will also update some common expr/exec like AttributeReference, ShuffleExec, ProjectExec, FileterExec ...... 5. Parallelly update the xfail case. When we support non-UTC for a new operator, remove the xfail. For other xfail cases, we should investigate or write fallback cases.

And please review these first: #9652 This is for step 1 #9721 This is for step 3,4

For 1 and 2 here, I'm filed #9737 to track this separately. I think this will be important. The issue is that we need to really ensure that there is no data corruption when merging many of the changes here.

@res-life
Copy link
Collaborator Author

res-life commented Nov 22, 2023

Replace this with #9719

@res-life res-life closed this Nov 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
task Work required that improves the product but is not user facing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Convert Timestamp/Timezone tests/checks to be per operator instead of generic
8 participants