Skip to content

Commit

Permalink
Add perf test for time zone operators
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <[email protected]>
  • Loading branch information
Chong Gao committed Jan 24, 2024
1 parent 1acc8e0 commit 5b4c494
Showing 1 changed file with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.types._
* - c_long_of_ts: long value which is microseconds
* - c_date: date column
* - c_int_of_date:int value which is days from 1970-01-01
* - c_long_of_ts_seconds: long values of seconds from epoch
* - c_str_for_cast: strings for cast to timestamp, formats are yyyy, yyyy-mm, ...
* - c_str_of_ts: strings with format: yyyy-MM-dd HH:mm:ss
* Each column is high duplicated.
Expand Down Expand Up @@ -100,11 +101,14 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl
def createDF(spark: SparkSession): DataFrame = {
val id = col("id")
val tsArray = Array[Long](year1980, year2000, year2030)
val secondsArray = tsArray.map(e => e / 1000000L)
val dateArray = Array[Int](0, 100, 200)
val columns = Array[Column](
TimeZonePerfUtils.createColumn(id, TimestampType, TsGenFunc(tsArray)).alias("c_ts"),
TimeZonePerfUtils.createColumn(id, LongType, TsGenFunc(tsArray)).alias("c_long_of_ts"),
TimeZonePerfUtils.createColumn(id, DateType, DateGenFunc(dateArray)).alias("c_date"),
TimeZonePerfUtils.createColumn(id, LongType, TsGenFunc(secondsArray))
.alias("c_long_of_ts_seconds"),
TimeZonePerfUtils.createColumn(id, IntegerType, DateGenFunc(dateArray))
.alias("c_int_of_date"),
TimeZonePerfUtils.createColumn(id, StringType, StringGenFunc(stringsForCast))
Expand Down Expand Up @@ -137,7 +141,8 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl
val startOnCpu = System.nanoTime()
withCpuSparkSession(
spark => func(spark, zoneStr).collect(),
conf)
// set session time zone
conf.set("spark.sql.session.timeZone", zoneStr))
val endOnCpu = System.nanoTime()
val elapseOnCpuMS = (endOnCpu - startOnCpu) / 1000000L
if (i != 1) {
Expand All @@ -148,7 +153,8 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl
val startOnGpu = System.nanoTime()
withGpuSparkSession(
spark => func(spark, zoneStr).collect(),
conf)
// set session time zone
conf.set("spark.sql.session.timeZone", zoneStr))
val endOnGpu = System.nanoTime()
val elapseOnGpuMS = (endOnGpu - startOnGpu) / 1000000L
if (i != 1) {
Expand Down Expand Up @@ -197,4 +203,106 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl

runAndRecordTime("to_utc_timestamp", perfTest)
}

test("test hour") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.hour(functions.col("c_ts"))
))
}

runAndRecordTime("hour",
perfTest)
}

test("test minute") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.minute(functions.col("c_ts"))
))
}

runAndRecordTime("minute",
perfTest)
}

test("test second") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.second(functions.col("c_ts"))
))
}

runAndRecordTime("second",
perfTest)
}

test("test unix_timestamp") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.unix_timestamp(functions.col("c_ts"))
))
}

runAndRecordTime("unix_timestamp",
perfTest)
}

test("test from_unixtime") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.from_unixtime(functions.col("c_long_of_ts_seconds"))
))
}

runAndRecordTime("from_unixtime",
perfTest)
}

test("test date_format") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.date_format(functions.col("c_ts"), "yyyy-MM-dd HH:mm:ss")
))
}

runAndRecordTime("date_format",
perfTest)
}
}

0 comments on commit 5b4c494

Please sign in to comment.