From de59e2ee5f8160199b1eaed134f2e78bfe220a78 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Mon, 29 Jan 2024 14:11:15 +0800 Subject: [PATCH] Add perf test for time zone operators (#10260) Signed-off-by: Chong Gao --- .../rapids/timezone/TimeZonePerfSuite.scala | 112 +++++++++++++++++- 1 file changed, 110 insertions(+), 2 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala index 288bb38653f..d3388c68931 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/timezone/TimeZonePerfSuite.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.types._ * - c_long_of_ts: long value which is microseconds * - c_date: date column * - c_int_of_date:int value which is days from 1970-01-01 + * - c_long_of_ts_seconds: long values of seconds from epoch * - c_str_for_cast: strings for cast to timestamp, formats are yyyy, yyyy-mm, ... * - c_str_of_ts: strings with format: yyyy-MM-dd HH:mm:ss * Each column is high duplicated. @@ -100,11 +101,14 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl def createDF(spark: SparkSession): DataFrame = { val id = col("id") val tsArray = Array[Long](year1980, year2000, year2030) + val secondsArray = tsArray.map(e => e / 1000000L) val dateArray = Array[Int](0, 100, 200) val columns = Array[Column]( TimeZonePerfUtils.createColumn(id, TimestampType, TsGenFunc(tsArray)).alias("c_ts"), TimeZonePerfUtils.createColumn(id, LongType, TsGenFunc(tsArray)).alias("c_long_of_ts"), TimeZonePerfUtils.createColumn(id, DateType, DateGenFunc(dateArray)).alias("c_date"), + TimeZonePerfUtils.createColumn(id, LongType, TsGenFunc(secondsArray)) + .alias("c_long_of_ts_seconds"), TimeZonePerfUtils.createColumn(id, IntegerType, DateGenFunc(dateArray)) .alias("c_int_of_date"), TimeZonePerfUtils.createColumn(id, StringType, StringGenFunc(stringsForCast)) @@ -137,7 +141,8 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl val startOnCpu = System.nanoTime() withCpuSparkSession( spark => func(spark, zoneStr).collect(), - conf) + // set session time zone + conf.set("spark.sql.session.timeZone", zoneStr)) val endOnCpu = System.nanoTime() val elapseOnCpuMS = (endOnCpu - startOnCpu) / 1000000L if (i != 1) { @@ -148,7 +153,8 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl val startOnGpu = System.nanoTime() withGpuSparkSession( spark => func(spark, zoneStr).collect(), - conf) + // set session time zone + conf.set("spark.sql.session.timeZone", zoneStr)) val endOnGpu = System.nanoTime() val elapseOnGpuMS = (endOnGpu - startOnGpu) / 1000000L if (i != 1) { @@ -197,4 +203,106 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl runAndRecordTime("to_utc_timestamp", perfTest) } + + test("test hour") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).select(functions.count( + functions.hour(functions.col("c_ts")) + )) + } + + runAndRecordTime("hour", + perfTest) + } + + test("test minute") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).select(functions.count( + functions.minute(functions.col("c_ts")) + )) + } + + runAndRecordTime("minute", + perfTest) + } + + test("test second") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).select(functions.count( + functions.second(functions.col("c_ts")) + )) + } + + runAndRecordTime("second", + perfTest) + } + + test("test unix_timestamp") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).select(functions.count( + functions.unix_timestamp(functions.col("c_str_of_ts")) + )) + } + + runAndRecordTime("unix_timestamp", + perfTest) + } + + test("test from_unixtime") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).select(functions.count( + functions.from_unixtime(functions.col("c_long_of_ts_seconds")) + )) + } + + runAndRecordTime("from_unixtime", + perfTest) + } + + test("test date_format") { + assume(enablePerfTest) + + // cache time zone DB in advance + GpuTimeZoneDB.cacheDatabase() + Thread.sleep(5L) + + def perfTest(spark: SparkSession, zone: String): DataFrame = { + spark.read.parquet(path).select(functions.count( + functions.date_format(functions.col("c_ts"), "yyyy-MM-dd HH:mm:ss") + )) + } + + runAndRecordTime("date_format", + perfTest) + } }