From 1fed95a548c2ca32e65055cc20da54e6f2549ec2 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 15 Nov 2023 13:12:46 -0800 Subject: [PATCH 1/8] Updates to test suite to start using GpuTimeZoneDB Signed-off-by: Navin Kumar --- .../spark/rapids/timezone/TimeZoneDB.scala | 5 +- .../spark/rapids/timezone/TimeZoneSuite.scala | 167 +++++++++++++----- 2 files changed, 124 insertions(+), 48 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala index e7c4585d679..ab95461a453 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids.timezone import java.time.ZoneId +import java.util.concurrent.Executor import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} import com.nvidia.spark.rapids.Arm.withResource @@ -25,7 +26,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils object TimeZoneDB { - def cacheDatabase(): Unit = {} + def cacheDatabase(e: Executor): Unit = {} /** * Interpret a timestamp as a time in the given time zone, @@ -121,7 +122,7 @@ object TimeZoneDB { assert(inputVector.getType == DType.TIMESTAMP_DAYS) val rowCount = inputVector.getRowCount.toInt withResource(inputVector.copyToHost()) { input => - withResource(HostColumnVector.builder(DType.INT64, rowCount)) { builder => + withResource(HostColumnVector.builder(DType.TIMESTAMP_MICROSECONDS, rowCount)) { builder => var currRow = 0 while (currRow < rowCount) { val origin = input.getInt(currRow) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala index bdb532e6399..6668b44fda1 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala @@ -18,19 +18,27 @@ package com.nvidia.spark.rapids.timezone import java.time._ import java.util.concurrent.TimeUnit - import scala.collection.JavaConverters._ import scala.collection.mutable - import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.SparkQueryCompareTestSuite - +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToInstant import org.apache.spark.sql.types._ +import org.scalatest.BeforeAndAfterAll + +import java.util.concurrent.Executors + +class TimeZoneSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAll { + private val useGPU = true + private val testAllTimezones = false + private val testAllYears = false + + private var zones = Seq.empty[String] -class TimeZoneSuite extends SparkQueryCompareTestSuite { /** * create timestamp column vector */ @@ -91,13 +99,24 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { /** * assert timestamp result with Spark result */ - def assertTimestampRet(actualRet: ColumnVector, sparkRet: Seq[Row]): Unit = { + def assertTimestampRet(actualRet: ColumnVector, sparkRet: Seq[Row], input: ColumnVector): Unit = { withResource(actualRet.copyToHost()) { host => - assert(actualRet.getRowCount == sparkRet.length) - for (i <- 0 until actualRet.getRowCount.toInt) { - val sparkInstant = sparkRet(i).getInstant(0) - val sparkMicro = sparkInstant.getEpochSecond * 1000000L + sparkInstant.getNano / 1000L - assert(host.getLong(i) == sparkMicro) + withResource(input.copyToHost()) { hostInput => + assert(actualRet.getRowCount == sparkRet.length) + for (i <- 0 until actualRet.getRowCount.toInt) { + val sparkInstant = sparkRet(i).getInstant(0) + val sparkMicro = sparkInstant.getEpochSecond * 1000000L + sparkInstant.getNano / 1000L + if (hostInput.getType == DType.TIMESTAMP_DAYS) { + assert(host.getLong(i) == sparkMicro, + s"for ${hostInput.getInt(i)} " + + s"${microsToInstant(host.getLong(i))} != ${microsToInstant(sparkMicro)}") + + } else { + assert(host.getLong(i) == sparkMicro, + s"for ${hostInput.getLong(i)} (${microsToInstant(hostInput.getLong(i))}) " + + s"${microsToInstant(host.getLong(i))} != ${microsToInstant(sparkMicro)}") + } + } } } } @@ -160,18 +179,24 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { .set("spark.sql.datetime.java8API.enabled", "true")) // get result from TimeZoneDB - val actualRet = withResource(createColumnVector(epochSeconds)) { inputCv => - TimeZoneDB.fromUtcTimestampToTimestamp( - inputCv, - ZoneId.of(zoneStr)) + withResource(createColumnVector(epochSeconds)) { inputCv => + val actualRet = if (useGPU) { + GpuTimeZoneDB.fromUtcTimestampToTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } else { + TimeZoneDB.fromUtcTimestampToTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } + withResource(actualRet) { _ => + assertTimestampRet(actualRet, sparkRet, inputCv) + } } - withResource(actualRet) { _ => - assertTimestampRet(actualRet, sparkRet) - } } - def testFromTimestampToUTCTimestamp(epochSeconds: Array[Long], zoneStr: String): Unit = { + def testFromTimestampToUtcTimestamp(epochSeconds: Array[Long], zoneStr: String): Unit = { // get result from Spark val sparkRet = withCpuSparkSession( spark => { @@ -189,15 +214,21 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { .set("spark.sql.datetime.java8API.enabled", "true")) // get result from TimeZoneDB - val actualRet = withResource(createColumnVector(epochSeconds)) { inputCv => - TimeZoneDB.fromTimestampToUtcTimestamp( - inputCv, - ZoneId.of(zoneStr)) + withResource(createColumnVector(epochSeconds)) { inputCv => + val actualRet = if (useGPU) { + GpuTimeZoneDB.fromTimestampToUtcTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } else { + TimeZoneDB.fromTimestampToUtcTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } + withResource(actualRet) { _ => + assertTimestampRet(actualRet, sparkRet, inputCv) + } } - withResource(actualRet) { _ => - assertTimestampRet(actualRet, sparkRet) - } } def testFromTimestampToDate(epochSeconds: Array[Long], zoneStr: String): Unit = { @@ -245,15 +276,15 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { .set("spark.sql.datetime.java8API.enabled", "true")) // get result from TimeZoneDB - val actualRet = withResource(createDateColumnVector(epochDays)) { inputCv => - TimeZoneDB.fromDateToTimestamp( + withResource(createDateColumnVector(epochDays)) { inputCv => + val actualRet = TimeZoneDB.fromDateToTimestamp( inputCv, ZoneId.of(zoneStr)) + withResource(actualRet) { _ => + assertTimestampRet(actualRet, sparkRet, inputCv) + } } - withResource(actualRet) { _ => - assertTimestampRet(actualRet, sparkRet) - } } def selectWithRepeatZones: Seq[String] = { @@ -266,36 +297,80 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { repeatZones.slice(0, 2) ++ mustZones } - def selectNonRepeatZones: Seq[String] = { + def selectTimeZones: Seq[String] = { val mustZones = Array[String]("Asia/Shanghai", "America/Sao_Paulo") - val nonRepeatZones = ZoneId.getAvailableZoneIds.asScala.toList.filter { z => - val rules = ZoneId.of(z).getRules - // remove this line when we support repeat rules - (rules.isFixedOffset || rules.getTransitionRules.isEmpty) && !mustZones.contains(z) + if (testAllTimezones) { + val nonRepeatZones = ZoneId.getAvailableZoneIds.asScala.toList.filter { z => + val rules = ZoneId.of(z).getRules + // remove this line when we support repeat rules + (rules.isFixedOffset || rules.getTransitionRules.isEmpty) && !mustZones.contains(z) + } + scala.util.Random.shuffle(nonRepeatZones) + nonRepeatZones.slice(0, 2) ++ mustZones + } else { + mustZones + } + } + + override def beforeAll(): Unit = { + if (useGPU) { + GpuTimeZoneDB.cacheDatabase(Executors.newCachedThreadPool()) + } else { + TimeZoneDB.cacheDatabase(Executors.newCachedThreadPool()) + } + zones = selectTimeZones + } + + override def afterAll(): Unit = { + if (useGPU) { + GpuTimeZoneDB.unload() } - scala.util.Random.shuffle(nonRepeatZones) - nonRepeatZones.slice(0, 2) ++ mustZones } - test("test all time zones") { - assume(false, - "It's time consuming for test all time zones, by default it's disabled") + test("test timestamp to utc timestamp") { + for (zoneStr <- zones) { + // iterate years + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 + for (year <- startYear until endYear by 7) { + val epochSeconds = getEpochSeconds(year, year + 1) + testFromTimestampToUtcTimestamp(epochSeconds, zoneStr) + } + } + } - val zones = selectNonRepeatZones - // iterate zones + test("test utc timestamp to timestamp") { for (zoneStr <- zones) { // iterate years - val startYear = 1 - val endYear = 9999 + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 for (year <- startYear until endYear by 7) { val epochSeconds = getEpochSeconds(year, year + 1) testFromUtcTimeStampToTimestamp(epochSeconds, zoneStr) - testFromTimestampToUTCTimestamp(epochSeconds, zoneStr) - testFromTimestampToDate(epochSeconds, zoneStr) } + } + } + test("test timestamp to date") { + for (zoneStr <- zones) { + // iterate years + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 + for (year <- startYear until endYear by 7) { + val epochSeconds = getEpochSeconds(year, year + 1) + testFromTimestampToDate(epochSeconds, zoneStr) + } + } + } + + test("test date to timestamp") { + for (zoneStr <- zones) { + // iterate years + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 val epochDays = getEpochDays(startYear, endYear) testFromDateToTimestamp(epochDays, zoneStr) } } + } From bb3e8e052e63d320990121c76ef64ec4787ef7c7 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 15 Nov 2023 23:53:46 -0800 Subject: [PATCH 2/8] Initialize the GPU timezone database in the plugin initialization for executor, use this in tests Signed-off-by: Navin Kumar --- .../scala/com/nvidia/spark/rapids/Plugin.scala | 6 +++--- .../spark/rapids/timezone/TimeZoneDB.scala | 3 +-- .../spark/rapids/timezone/TimeZoneSuite.scala | 17 +++++------------ 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 6520ff4c1b7..3fb7a979c7d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -19,16 +19,14 @@ package com.nvidia.spark.rapids import java.lang.reflect.InvocationTargetException import java.time.ZoneId import java.util.Properties - import scala.collection.JavaConverters._ import scala.sys.process._ import scala.util.Try - import ai.rapids.cudf.{Cuda, CudaException, CudaFatalException, CudfException, MemoryCleaner} import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg} +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.commons.lang3.exception.ExceptionUtils - import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, TaskFailedReason} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.internal.Logging @@ -381,6 +379,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { s"Driver timezone is $driverTimezone and executor timezone is " + s"$executorTimezone. Set executor timezone to $driverTimezone.") } + GpuTimeZoneDB.cacheDatabase() } GpuCoreDumpHandler.executorInit(conf, pluginContext) @@ -503,6 +502,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } override def shutdown(): Unit = { + GpuTimeZoneDB.shutdown() GpuSemaphore.shutdown() PythonWorkerSemaphore.shutdown() GpuDeviceManager.shutdown() diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala index ab95461a453..e5d399fb7f8 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneDB.scala @@ -17,7 +17,6 @@ package com.nvidia.spark.rapids.timezone import java.time.ZoneId -import java.util.concurrent.Executor import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} import com.nvidia.spark.rapids.Arm.withResource @@ -26,7 +25,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils object TimeZoneDB { - def cacheDatabase(e: Executor): Unit = {} + def cacheDatabase(): Unit = {} /** * Interpret a timestamp as a time in the given time zone, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala index 6668b44fda1..c37f4ec6040 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala @@ -18,19 +18,20 @@ package com.nvidia.spark.rapids.timezone import java.time._ import java.util.concurrent.TimeUnit + import scala.collection.JavaConverters._ import scala.collection.mutable + import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.SparkQueryCompareTestSuite import com.nvidia.spark.rapids.jni.GpuTimeZoneDB +import org.scalatest.BeforeAndAfterAll + import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToInstant import org.apache.spark.sql.types._ -import org.scalatest.BeforeAndAfterAll - -import java.util.concurrent.Executors class TimeZoneSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAll { private val useGPU = true @@ -313,17 +314,9 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAll { } override def beforeAll(): Unit = { - if (useGPU) { - GpuTimeZoneDB.cacheDatabase(Executors.newCachedThreadPool()) - } else { - TimeZoneDB.cacheDatabase(Executors.newCachedThreadPool()) - } zones = selectTimeZones - } - - override def afterAll(): Unit = { if (useGPU) { - GpuTimeZoneDB.unload() + withGpuSparkSession(_ => { }) } } From 88113203a9ca0c42b1b50cd6265f9f06a2f0ecf0 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Mon, 20 Nov 2023 18:07:07 -0800 Subject: [PATCH 3/8] Remove GpuTimeZoneDB init from plugin for now, eventually add again and hide behind a config flag when first expression using it is completed. Signed-off-by: Navin Kumar --- .../src/main/scala/com/nvidia/spark/rapids/Plugin.scala | 3 --- .../com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala | 8 +++++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 3fb7a979c7d..b17ff8ba3dd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -24,7 +24,6 @@ import scala.sys.process._ import scala.util.Try import ai.rapids.cudf.{Cuda, CudaException, CudaFatalException, CudfException, MemoryCleaner} import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg} -import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, TaskFailedReason} @@ -379,7 +378,6 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { s"Driver timezone is $driverTimezone and executor timezone is " + s"$executorTimezone. Set executor timezone to $driverTimezone.") } - GpuTimeZoneDB.cacheDatabase() } GpuCoreDumpHandler.executorInit(conf, pluginContext) @@ -502,7 +500,6 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } override def shutdown(): Unit = { - GpuTimeZoneDB.shutdown() GpuSemaphore.shutdown() PythonWorkerSemaphore.shutdown() GpuDeviceManager.shutdown() diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala index 38597651391..cc6e643591a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala @@ -317,7 +317,13 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { zones = selectTimeZones if (useGPU) { - withGpuSparkSession(_ => { }) + GpuTimeZoneDB.cacheDatabase() + } + } + + override def afterAll(): Unit = { + if (useGPU) { + GpuTimeZoneDB.shutdown() } } From 93f5660b833853da981b651f9c72112c2afa6073 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Mon, 20 Nov 2023 18:13:32 -0800 Subject: [PATCH 4/8] Add plugin initialization code and change config flag for non UTC timezone support Signed-off-by: Navin Kumar --- .../src/main/python/date_time_test.py | 4 ++-- .../scala/com/nvidia/spark/rapids/Plugin.scala | 7 +++++++ .../com/nvidia/spark/rapids/RapidsConf.scala | 15 +++++++++------ .../spark/sql/rapids/datetimeExpressions.scala | 3 +-- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 0d4f457f8e8..5aaf11b1192 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -281,9 +281,9 @@ def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone): @pytest.mark.parametrize('time_zone', ["UTC", "Asia/Shanghai", "EST", "MST", "VST"], ids=idfn) @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) def test_from_utc_timestamp_supported_timezones(data_gen, time_zone): - # Remove spark.rapids.test.CPU.timezone configuration when GPU kernel is ready to really test on GPU + # TODO: Remove spark.rapids.sql.nonUTC.enabled configuration assert_gpu_and_cpu_are_equal_collect( - lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), conf = {"spark.rapids.test.CPU.timezone": "true"}) + lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), conf = {"spark.rapids.sql.nonUTC.enabled": "true"}) @allow_non_gpu('ProjectExec') diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index b17ff8ba3dd..f1800a8a781 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -24,6 +24,7 @@ import scala.sys.process._ import scala.util.Try import ai.rapids.cudf.{Cuda, CudaException, CudaFatalException, CudfException, MemoryCleaner} import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg} +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, TaskFailedReason} @@ -378,6 +379,9 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { s"Driver timezone is $driverTimezone and executor timezone is " + s"$executorTimezone. Set executor timezone to $driverTimezone.") } + if (conf.nonUTCTimeZoneEnabled) { + GpuTimeZoneDB.cacheDatabase() + } } GpuCoreDumpHandler.executorInit(conf, pluginContext) @@ -500,6 +504,9 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } override def shutdown(): Unit = { + if (conf.nonUTCTimeZoneEnabled) { + GpuTimeZoneDB.shutdown() + } GpuSemaphore.shutdown() PythonWorkerSemaphore.shutdown() GpuDeviceManager.shutdown() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index ffc37cebb72..9abaccfa0f6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2044,6 +2044,13 @@ object RapidsConf { "The gpu to disk spill bounce buffer must have a positive size") .createWithDefault(128L * 1024 * 1024) + val NON_UTC_TIME_ZONE_ENABLED = + conf("spark.rapids.sql.nonUTC.enabled") + .doc("An option to enable/disable non-UTC time zone support.") + .internal() + .booleanConf + .createWithDefault(false) + val SPLIT_UNTIL_SIZE_OVERRIDE = conf("spark.rapids.sql.test.overrides.splitUntilSize") .doc("Only for tests: override the value of GpuDeviceManager.splitUntilSize") .internal() @@ -2056,12 +2063,6 @@ object RapidsConf { .booleanConf .createOptional - val TEST_USE_TIMEZONE_CPU_BACKEND = conf("spark.rapids.test.CPU.timezone") - .doc("Only for tests: verify for timezone related functions") - .internal() - .booleanConf - .createOptional - private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -2754,6 +2755,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val splitUntilSizeOverride: Option[Long] = get(SPLIT_UNTIL_SIZE_OVERRIDE) + lazy val nonUTCTimeZoneEnabled: Boolean = get(NON_UTC_TIME_ZONE_ENABLED) + private val optimizerDefaults = Map( // this is not accurate because CPU projections do have a cost due to appending values // to each row that is produced, but this needs to be a really small number because diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 191b39b4bba..98f07f6fb0b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -23,7 +23,6 @@ import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, import com.nvidia.spark.rapids.{BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuBinaryExpressionArgsAnyScalar, GpuCast, GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} -import com.nvidia.spark.rapids.RapidsConf.TEST_USE_TIMEZONE_CPU_BACKEND import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.ShimBinaryExpression @@ -1046,7 +1045,7 @@ class FromUTCTimestampExprMeta( extends BinaryExprMeta[FromUTCTimestamp](expr, conf, parent, rule) { private[this] var timezoneId: ZoneId = null - private[this] val isOnCPU: Boolean = conf.get(TEST_USE_TIMEZONE_CPU_BACKEND).getOrElse(false) + private[this] val isOnCPU: Boolean = conf.nonUTCTimeZoneEnabled override def tagExprForGpu(): Unit = { extractStringLit(expr.right) match { From 1d4184dee5828bd2370681f0e33f78f5912148d6 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Mon, 20 Nov 2023 22:29:00 -0800 Subject: [PATCH 5/8] Use GpuTimeZoneDB to run from_utc_timestamp on the GPU for non-UTC non-DST timezones Signed-off-by: Navin Kumar --- .../nvidia/spark/rapids/GpuOverrides.scala | 4 ++++ .../com/nvidia/spark/rapids/Plugin.scala | 7 +----- .../apache/spark/sql/rapids/TimeZoneDB.scala | 7 +----- .../sql/rapids/datetimeExpressions.scala | 24 +++++++++---------- .../spark/rapids/timezone/TimeZoneSuite.scala | 3 --- 5 files changed, 18 insertions(+), 27 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index bdeae65a975..89969c01725 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -621,6 +621,10 @@ object GpuOverrides extends Logging { } } + def isUTCTimezone(timezoneId: ZoneId): Boolean = { + timezoneId.normalized() == UTC_TIMEZONE_ID + } + def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType(_)) /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index f1800a8a781..d07e2b1bc3f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -379,9 +379,6 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { s"Driver timezone is $driverTimezone and executor timezone is " + s"$executorTimezone. Set executor timezone to $driverTimezone.") } - if (conf.nonUTCTimeZoneEnabled) { - GpuTimeZoneDB.cacheDatabase() - } } GpuCoreDumpHandler.executorInit(conf, pluginContext) @@ -504,9 +501,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } override def shutdown(): Unit = { - if (conf.nonUTCTimeZoneEnabled) { - GpuTimeZoneDB.shutdown() - } + GpuTimeZoneDB.shutdown() GpuSemaphore.shutdown() PythonWorkerSemaphore.shutdown() GpuDeviceManager.shutdown() diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala index 94cab7792ba..91c1928cc00 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala @@ -20,15 +20,10 @@ import java.time.ZoneId import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.GpuOverrides import org.apache.spark.sql.catalyst.util.DateTimeUtils object TimeZoneDB { - def isUTCTimezone(timezoneId: ZoneId): Boolean = { - timezoneId.normalized() == GpuOverrides.UTC_TIMEZONE_ID - } - // Copied from Spark. Used to format time zone ID string with (+|-)h:mm and (+|-)hh:m def getZoneId(timezoneId: String): ZoneId = { val formattedZoneId = timezoneId @@ -40,7 +35,7 @@ object TimeZoneDB { } // Support fixed offset or no transition rule case - def isSupportedTimezone(timezoneId: String): Boolean = { + def isSupportedTimeZone(timezoneId: String): Boolean = { val rules = getZoneId(timezoneId).getRules rules.isFixedOffset || rules.getTransitionRules.isEmpty } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 98f07f6fb0b..8f6c591787f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -20,10 +20,11 @@ import java.time.ZoneId import java.util.concurrent.TimeUnit import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar} -import com.nvidia.spark.rapids.{BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuBinaryExpressionArgsAnyScalar, GpuCast, GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.{BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuBinaryExpressionArgsAnyScalar, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.shims.ShimBinaryExpression import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, FromUTCTimestamp, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} @@ -1045,7 +1046,7 @@ class FromUTCTimestampExprMeta( extends BinaryExprMeta[FromUTCTimestamp](expr, conf, parent, rule) { private[this] var timezoneId: ZoneId = null - private[this] val isOnCPU: Boolean = conf.nonUTCTimeZoneEnabled + private[this] val nonUTCEnabled: Boolean = conf.nonUTCTimeZoneEnabled override def tagExprForGpu(): Unit = { extractStringLit(expr.right) match { @@ -1053,12 +1054,11 @@ class FromUTCTimestampExprMeta( willNotWorkOnGpu("timezone input must be a literal string") case Some(timezoneShortID) => if (timezoneShortID != null) { - timezoneId = TimeZoneDB.getZoneId(timezoneShortID) + timezoneId = GpuTimeZoneDB.getZoneId(timezoneShortID) // Always pass for UTC timezone since it's no-op. - if (!TimeZoneDB.isUTCTimezone(timezoneId)) { - // Check CPU path, mostly for test purpose - if (isOnCPU) { - if(!TimeZoneDB.isSupportedTimezone(timezoneShortID)) { + if (!GpuOverrides.isUTCTimezone(timezoneId)) { + if (nonUTCEnabled) { + if(!GpuTimeZoneDB.isSupportedTimeZone(timezoneShortID)) { willNotWorkOnGpu(s"Not supported timezone type $timezoneShortID.") } } else { @@ -1071,11 +1071,11 @@ class FromUTCTimestampExprMeta( } override def convertToGpu(timestamp: Expression, timezone: Expression): GpuExpression = - GpuFromUTCTimestamp(timestamp, timezone, timezoneId, isOnCPU) + GpuFromUTCTimestamp(timestamp, timezone, timezoneId, nonUTCEnabled) } case class GpuFromUTCTimestamp( - timestamp: Expression, timezone: Expression, zoneId: ZoneId, isOnCPU: Boolean) + timestamp: Expression, timezone: Expression, zoneId: ZoneId, nonUTCEnabled: Boolean) extends GpuBinaryExpressionArgsAnyScalar with ImplicitCastInputTypes with NullIntolerant { @@ -1087,12 +1087,12 @@ case class GpuFromUTCTimestamp( override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { if (rhs.getBase.isValid) { - if (TimeZoneDB.isUTCTimezone(zoneId)) { + if (GpuOverrides.isUTCTimezone(zoneId)) { // For UTC timezone, just a no-op bypassing GPU computation. lhs.getBase.incRefCount() } else { - if (isOnCPU){ - TimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId) + if (nonUTCEnabled){ + GpuTimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId) } else { // TODO: remove this until GPU backend supported. throw new UnsupportedOperationException( diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala index cc6e643591a..dcfbc508034 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala @@ -316,9 +316,6 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { zones = selectTimeZones - if (useGPU) { - GpuTimeZoneDB.cacheDatabase() - } } override def afterAll(): Unit = { From 031c6c916febc8dabc5c23f033bd5cd309f3671c Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 22 Nov 2023 10:20:28 -0800 Subject: [PATCH 6/8] Fix formatting of includes here Signed-off-by: Navin Kumar --- sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index d07e2b1bc3f..692ee38a48c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -19,14 +19,17 @@ package com.nvidia.spark.rapids import java.lang.reflect.InvocationTargetException import java.time.ZoneId import java.util.Properties + import scala.collection.JavaConverters._ import scala.sys.process._ import scala.util.Try + import ai.rapids.cudf.{Cuda, CudaException, CudaFatalException, CudfException, MemoryCleaner} import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg} import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.commons.lang3.exception.ExceptionUtils + import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, TaskFailedReason} import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.internal.Logging From e5deb4ada37ea2a26065b3c6321cd6d2c22f7adb Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Mon, 27 Nov 2023 16:33:44 -0800 Subject: [PATCH 7/8] Add some Olson timezones to fallback test and add config option to test proper fallback Signed-off-by: Navin Kumar --- integration_tests/src/main/python/date_time_test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 9be5182bf08..2340ff75000 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -286,12 +286,13 @@ def test_from_utc_timestamp(data_gen, time_zone): lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone))) @allow_non_gpu('ProjectExec') -@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST"], ids=idfn) +@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn) @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone): assert_gpu_fallback_collect( lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), - 'FromUTCTimestamp') + 'FromUTCTimestamp', + conf = {"spark.rapids.sql.nonUTC.enabled": "true"}) @pytest.mark.parametrize('time_zone', ["UTC", "Asia/Shanghai", "EST", "MST", "VST"], ids=idfn) @@ -300,8 +301,8 @@ def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone): def test_from_utc_timestamp_supported_timezones(data_gen, time_zone): # TODO: Remove spark.rapids.sql.nonUTC.enabled configuration assert_gpu_and_cpu_are_equal_collect( - lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), conf = {"spark.rapids.sql.nonUTC.enabled": "true"}) - + lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), + conf = {"spark.rapids.sql.nonUTC.enabled": "true"}) @allow_non_gpu('ProjectExec') @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) From 74dcf39eff6e0d635de4fada4b049c1345dcaddb Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Mon, 27 Nov 2023 16:37:25 -0800 Subject: [PATCH 8/8] Add fallback test for when config option is not enabled Signed-off-by: Navin Kumar --- integration_tests/src/main/python/date_time_test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 2340ff75000..d68dd93efac 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -285,6 +285,14 @@ def test_from_utc_timestamp(data_gen, time_zone): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone))) +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('time_zone', ["Asia/Shanghai", "EST", "MST", "VST", "PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn) +@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) +def test_from_utc_timestamp_non_utc_fallback(data_gen, time_zone): + assert_gpu_fallback_collect( + lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), + 'FromUTCTimestamp') + @allow_non_gpu('ProjectExec') @pytest.mark.parametrize('time_zone', ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn) @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)