diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 1d984193f9e..d68dd93efac 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -286,22 +286,31 @@ def test_from_utc_timestamp(data_gen, time_zone): lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone))) @allow_non_gpu('ProjectExec') -@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST"], ids=idfn) +@pytest.mark.parametrize('time_zone', ["Asia/Shanghai", "EST", "MST", "VST", "PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn) @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) -def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone): +def test_from_utc_timestamp_non_utc_fallback(data_gen, time_zone): assert_gpu_fallback_collect( lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), 'FromUTCTimestamp') +@allow_non_gpu('ProjectExec') +@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn) +@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) +def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone): + assert_gpu_fallback_collect( + lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), + 'FromUTCTimestamp', + conf = {"spark.rapids.sql.nonUTC.enabled": "true"}) + @pytest.mark.parametrize('time_zone', ["UTC", "Asia/Shanghai", "EST", "MST", "VST"], ids=idfn) @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) @pytest.mark.xfail(condition = is_not_utc(), reason = 'xfail non-UTC time zone tests because of https://github.com/NVIDIA/spark-rapids/issues/9653') def test_from_utc_timestamp_supported_timezones(data_gen, time_zone): - # Remove spark.rapids.test.CPU.timezone configuration when GPU kernel is ready to really test on GPU + # TODO: Remove spark.rapids.sql.nonUTC.enabled configuration assert_gpu_and_cpu_are_equal_collect( - lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), conf = {"spark.rapids.test.CPU.timezone": "true"}) - + lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), + conf = {"spark.rapids.sql.nonUTC.enabled": "true"}) @allow_non_gpu('ProjectExec') @pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 8119e78d988..fa9346f0ef4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -621,6 +621,10 @@ object GpuOverrides extends Logging { } } + def isUTCTimezone(timezoneId: ZoneId): Boolean = { + timezoneId.normalized() == UTC_TIMEZONE_ID + } + def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType(_)) /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala index 224a530bf99..18cbd7a26bb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala @@ -26,6 +26,7 @@ import scala.util.Try import ai.rapids.cudf.{Cuda, CudaException, CudaFatalException, CudfException, MemoryCleaner} import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg} +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.commons.lang3.exception.ExceptionUtils @@ -504,6 +505,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging { } override def shutdown(): Unit = { + GpuTimeZoneDB.shutdown() GpuSemaphore.shutdown() PythonWorkerSemaphore.shutdown() GpuDeviceManager.shutdown() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index ffc37cebb72..9abaccfa0f6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2044,6 +2044,13 @@ object RapidsConf { "The gpu to disk spill bounce buffer must have a positive size") .createWithDefault(128L * 1024 * 1024) + val NON_UTC_TIME_ZONE_ENABLED = + conf("spark.rapids.sql.nonUTC.enabled") + .doc("An option to enable/disable non-UTC time zone support.") + .internal() + .booleanConf + .createWithDefault(false) + val SPLIT_UNTIL_SIZE_OVERRIDE = conf("spark.rapids.sql.test.overrides.splitUntilSize") .doc("Only for tests: override the value of GpuDeviceManager.splitUntilSize") .internal() @@ -2056,12 +2063,6 @@ object RapidsConf { .booleanConf .createOptional - val TEST_USE_TIMEZONE_CPU_BACKEND = conf("spark.rapids.test.CPU.timezone") - .doc("Only for tests: verify for timezone related functions") - .internal() - .booleanConf - .createOptional - private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -2754,6 +2755,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val splitUntilSizeOverride: Option[Long] = get(SPLIT_UNTIL_SIZE_OVERRIDE) + lazy val nonUTCTimeZoneEnabled: Boolean = get(NON_UTC_TIME_ZONE_ENABLED) + private val optimizerDefaults = Map( // this is not accurate because CPU projections do have a cost due to appending values // to each row that is produced, but this needs to be a really small number because diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala index 2b1b8e6576b..91c1928cc00 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeZoneDB.scala @@ -20,15 +20,10 @@ import java.time.ZoneId import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} import com.nvidia.spark.rapids.Arm.withResource -import com.nvidia.spark.rapids.GpuOverrides import org.apache.spark.sql.catalyst.util.DateTimeUtils object TimeZoneDB { - def isUTCTimezone(timezoneId: ZoneId): Boolean = { - timezoneId.normalized() == GpuOverrides.UTC_TIMEZONE_ID - } - // Copied from Spark. Used to format time zone ID string with (+|-)h:mm and (+|-)hh:m def getZoneId(timezoneId: String): ZoneId = { val formattedZoneId = timezoneId @@ -40,7 +35,7 @@ object TimeZoneDB { } // Support fixed offset or no transition rule case - def isSupportedTimezone(timezoneId: String): Boolean = { + def isSupportedTimeZone(timezoneId: String): Boolean = { val rules = getZoneId(timezoneId).getRules rules.isFixedOffset || rules.getTransitionRules.isEmpty } @@ -153,7 +148,7 @@ object TimeZoneDB { assert(inputVector.getType == DType.TIMESTAMP_DAYS) val rowCount = inputVector.getRowCount.toInt withResource(inputVector.copyToHost()) { input => - withResource(HostColumnVector.builder(DType.INT64, rowCount)) { builder => + withResource(HostColumnVector.builder(DType.TIMESTAMP_MICROSECONDS, rowCount)) { builder => var currRow = 0 while (currRow < rowCount) { if (input.isNull(currRow)) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 191b39b4bba..8f6c591787f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -20,11 +20,11 @@ import java.time.ZoneId import java.util.concurrent.TimeUnit import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar} -import com.nvidia.spark.rapids.{BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuBinaryExpressionArgsAnyScalar, GpuCast, GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.{BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuBinaryExpressionArgsAnyScalar, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} -import com.nvidia.spark.rapids.RapidsConf.TEST_USE_TIMEZONE_CPU_BACKEND import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB import com.nvidia.spark.rapids.shims.ShimBinaryExpression import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, FromUTCTimestamp, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} @@ -1046,7 +1046,7 @@ class FromUTCTimestampExprMeta( extends BinaryExprMeta[FromUTCTimestamp](expr, conf, parent, rule) { private[this] var timezoneId: ZoneId = null - private[this] val isOnCPU: Boolean = conf.get(TEST_USE_TIMEZONE_CPU_BACKEND).getOrElse(false) + private[this] val nonUTCEnabled: Boolean = conf.nonUTCTimeZoneEnabled override def tagExprForGpu(): Unit = { extractStringLit(expr.right) match { @@ -1054,12 +1054,11 @@ class FromUTCTimestampExprMeta( willNotWorkOnGpu("timezone input must be a literal string") case Some(timezoneShortID) => if (timezoneShortID != null) { - timezoneId = TimeZoneDB.getZoneId(timezoneShortID) + timezoneId = GpuTimeZoneDB.getZoneId(timezoneShortID) // Always pass for UTC timezone since it's no-op. - if (!TimeZoneDB.isUTCTimezone(timezoneId)) { - // Check CPU path, mostly for test purpose - if (isOnCPU) { - if(!TimeZoneDB.isSupportedTimezone(timezoneShortID)) { + if (!GpuOverrides.isUTCTimezone(timezoneId)) { + if (nonUTCEnabled) { + if(!GpuTimeZoneDB.isSupportedTimeZone(timezoneShortID)) { willNotWorkOnGpu(s"Not supported timezone type $timezoneShortID.") } } else { @@ -1072,11 +1071,11 @@ class FromUTCTimestampExprMeta( } override def convertToGpu(timestamp: Expression, timezone: Expression): GpuExpression = - GpuFromUTCTimestamp(timestamp, timezone, timezoneId, isOnCPU) + GpuFromUTCTimestamp(timestamp, timezone, timezoneId, nonUTCEnabled) } case class GpuFromUTCTimestamp( - timestamp: Expression, timezone: Expression, zoneId: ZoneId, isOnCPU: Boolean) + timestamp: Expression, timezone: Expression, zoneId: ZoneId, nonUTCEnabled: Boolean) extends GpuBinaryExpressionArgsAnyScalar with ImplicitCastInputTypes with NullIntolerant { @@ -1088,12 +1087,12 @@ case class GpuFromUTCTimestamp( override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = { if (rhs.getBase.isValid) { - if (TimeZoneDB.isUTCTimezone(zoneId)) { + if (GpuOverrides.isUTCTimezone(zoneId)) { // For UTC timezone, just a no-op bypassing GPU computation. lhs.getBase.incRefCount() } else { - if (isOnCPU){ - TimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId) + if (nonUTCEnabled){ + GpuTimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId) } else { // TODO: remove this until GPU backend supported. throw new UnsupportedOperationException( diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala index 20013cde64d..dcfbc508034 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZoneSuite.scala @@ -25,13 +25,22 @@ import scala.collection.mutable import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.SparkQueryCompareTestSuite +import com.nvidia.spark.rapids.jni.GpuTimeZoneDB +import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToInstant import org.apache.spark.sql.rapids.TimeZoneDB import org.apache.spark.sql.types._ -class TimeZoneSuite extends SparkQueryCompareTestSuite { +class TimeZoneSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAll { + private val useGPU = true + private val testAllTimezones = false + private val testAllYears = false + + private var zones = Seq.empty[String] + /** * create timestamp column vector */ @@ -92,13 +101,24 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { /** * assert timestamp result with Spark result */ - def assertTimestampRet(actualRet: ColumnVector, sparkRet: Seq[Row]): Unit = { + def assertTimestampRet(actualRet: ColumnVector, sparkRet: Seq[Row], input: ColumnVector): Unit = { withResource(actualRet.copyToHost()) { host => - assert(actualRet.getRowCount == sparkRet.length) - for (i <- 0 until actualRet.getRowCount.toInt) { - val sparkInstant = sparkRet(i).getInstant(0) - val sparkMicro = sparkInstant.getEpochSecond * 1000000L + sparkInstant.getNano / 1000L - assert(host.getLong(i) == sparkMicro) + withResource(input.copyToHost()) { hostInput => + assert(actualRet.getRowCount == sparkRet.length) + for (i <- 0 until actualRet.getRowCount.toInt) { + val sparkInstant = sparkRet(i).getInstant(0) + val sparkMicro = sparkInstant.getEpochSecond * 1000000L + sparkInstant.getNano / 1000L + if (hostInput.getType == DType.TIMESTAMP_DAYS) { + assert(host.getLong(i) == sparkMicro, + s"for ${hostInput.getInt(i)} " + + s"${microsToInstant(host.getLong(i))} != ${microsToInstant(sparkMicro)}") + + } else { + assert(host.getLong(i) == sparkMicro, + s"for ${hostInput.getLong(i)} (${microsToInstant(hostInput.getLong(i))}) " + + s"${microsToInstant(host.getLong(i))} != ${microsToInstant(sparkMicro)}") + } + } } } } @@ -161,18 +181,24 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { .set("spark.sql.datetime.java8API.enabled", "true")) // get result from TimeZoneDB - val actualRet = withResource(createColumnVector(epochSeconds)) { inputCv => - TimeZoneDB.fromUtcTimestampToTimestamp( - inputCv, - ZoneId.of(zoneStr)) + withResource(createColumnVector(epochSeconds)) { inputCv => + val actualRet = if (useGPU) { + GpuTimeZoneDB.fromUtcTimestampToTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } else { + TimeZoneDB.fromUtcTimestampToTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } + withResource(actualRet) { _ => + assertTimestampRet(actualRet, sparkRet, inputCv) + } } - withResource(actualRet) { _ => - assertTimestampRet(actualRet, sparkRet) - } } - def testFromTimestampToUTCTimestamp(epochSeconds: Array[Long], zoneStr: String): Unit = { + def testFromTimestampToUtcTimestamp(epochSeconds: Array[Long], zoneStr: String): Unit = { // get result from Spark val sparkRet = withCpuSparkSession( spark => { @@ -190,15 +216,21 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { .set("spark.sql.datetime.java8API.enabled", "true")) // get result from TimeZoneDB - val actualRet = withResource(createColumnVector(epochSeconds)) { inputCv => - TimeZoneDB.fromTimestampToUtcTimestamp( - inputCv, - ZoneId.of(zoneStr)) + withResource(createColumnVector(epochSeconds)) { inputCv => + val actualRet = if (useGPU) { + GpuTimeZoneDB.fromTimestampToUtcTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } else { + TimeZoneDB.fromTimestampToUtcTimestamp( + inputCv, + ZoneId.of(zoneStr)) + } + withResource(actualRet) { _ => + assertTimestampRet(actualRet, sparkRet, inputCv) + } } - withResource(actualRet) { _ => - assertTimestampRet(actualRet, sparkRet) - } } def testFromTimestampToDate(epochSeconds: Array[Long], zoneStr: String): Unit = { @@ -246,15 +278,15 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { .set("spark.sql.datetime.java8API.enabled", "true")) // get result from TimeZoneDB - val actualRet = withResource(createDateColumnVector(epochDays)) { inputCv => - TimeZoneDB.fromDateToTimestamp( + withResource(createDateColumnVector(epochDays)) { inputCv => + val actualRet = TimeZoneDB.fromDateToTimestamp( inputCv, ZoneId.of(zoneStr)) + withResource(actualRet) { _ => + assertTimestampRet(actualRet, sparkRet, inputCv) + } } - withResource(actualRet) { _ => - assertTimestampRet(actualRet, sparkRet) - } } def selectWithRepeatZones: Seq[String] = { @@ -267,36 +299,75 @@ class TimeZoneSuite extends SparkQueryCompareTestSuite { repeatZones.slice(0, 2) ++ mustZones } - def selectNonRepeatZones: Seq[String] = { + def selectTimeZones: Seq[String] = { val mustZones = Array[String]("Asia/Shanghai", "America/Sao_Paulo") - val nonRepeatZones = ZoneId.getAvailableZoneIds.asScala.toList.filter { z => - val rules = ZoneId.of(z).getRules - // remove this line when we support repeat rules - (rules.isFixedOffset || rules.getTransitionRules.isEmpty) && !mustZones.contains(z) + if (testAllTimezones) { + val nonRepeatZones = ZoneId.getAvailableZoneIds.asScala.toList.filter { z => + val rules = ZoneId.of(z).getRules + // remove this line when we support repeat rules + (rules.isFixedOffset || rules.getTransitionRules.isEmpty) && !mustZones.contains(z) + } + scala.util.Random.shuffle(nonRepeatZones) + nonRepeatZones.slice(0, 2) ++ mustZones + } else { + mustZones } - scala.util.Random.shuffle(nonRepeatZones) - nonRepeatZones.slice(0, 2) ++ mustZones } - test("test all time zones") { - assume(false, - "It's time consuming for test all time zones, by default it's disabled") + override def beforeAll(): Unit = { + zones = selectTimeZones + } - val zones = selectNonRepeatZones - // iterate zones + override def afterAll(): Unit = { + if (useGPU) { + GpuTimeZoneDB.shutdown() + } + } + + test("test timestamp to utc timestamp") { + for (zoneStr <- zones) { + // iterate years + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 + for (year <- startYear until endYear by 7) { + val epochSeconds = getEpochSeconds(year, year + 1) + testFromTimestampToUtcTimestamp(epochSeconds, zoneStr) + } + } + } + + test("test utc timestamp to timestamp") { for (zoneStr <- zones) { // iterate years - val startYear = 1 - val endYear = 9999 + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 for (year <- startYear until endYear by 7) { val epochSeconds = getEpochSeconds(year, year + 1) testFromUtcTimeStampToTimestamp(epochSeconds, zoneStr) - testFromTimestampToUTCTimestamp(epochSeconds, zoneStr) - testFromTimestampToDate(epochSeconds, zoneStr) } + } + } + + test("test timestamp to date") { + for (zoneStr <- zones) { + // iterate years + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 + for (year <- startYear until endYear by 7) { + val epochSeconds = getEpochSeconds(year, year + 1) + testFromTimestampToDate(epochSeconds, zoneStr) + } + } + } + test("test date to timestamp") { + for (zoneStr <- zones) { + // iterate years + val startYear = if (testAllYears) 1 else 1899 + val endYear = if (testAllYears) 9999 else 2030 val epochDays = getEpochDays(startYear, endYear) testFromDateToTimestamp(epochDays, zoneStr) } } + }