diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala index 4fff5d66d..f4359c4a6 100644 --- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala +++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala @@ -20,7 +20,7 @@ import java.io.{OutputStream, PrintStream} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration.{DurationInt, FiniteDuration, NANOSECONDS} +import scala.concurrent.duration.NANOSECONDS import org.apache.commons.io.output.TeeOutputStream @@ -39,9 +39,8 @@ import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer} class Benchmark( name: String, valuesPerIteration: Long, - minNumIters: Int = 2, - warmupTime: FiniteDuration = 2.seconds, - minTime: FiniteDuration = 2.seconds, + minNumIters: Int, + warmUpIterations: Int, outputPerIteration: Boolean = false, output: Option[OutputStream] = None) { import Benchmark._ @@ -88,9 +87,12 @@ class Benchmark( def run(): Unit = { require(benchmarks.nonEmpty) // scalastyle:off + println("-" * 80) println("Running benchmark: " + name) + println("-" * 80) val results = benchmarks.map { c => - println(" Running case: " + c.name) + println(" RUNNING CASE : " + c.name) + println("-" * 80) measure(valuesPerIteration, c.numIters)(c.fn) } println @@ -99,17 +101,15 @@ class Benchmark( // The results are going to be processor specific so it is useful to include that. out.println(RuntimeUtil.getJVMOSInfo.mkString("\n")) val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max)) - out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n", - name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", "Per Row(ns)", "Relative") + out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n", + name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Relative") out.println("-" * (nameLen + 80)) results.zip(benchmarks).foreach { case (result, benchmark) => - out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n", + out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n", benchmark.name, "%5.0f" format result.bestMs, "%4.0f" format result.avgMs, "%5.0f" format result.stdevMs, - "%10.1f" format result.bestRate, - "%6.1f" format (1000 / result.bestRate), "%3.1fX" format (firstBest / result.bestMs)) } out.println() @@ -121,16 +121,13 @@ class Benchmark( */ def measure(num: Long, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = { System.gc() // ensures garbage from previous cases don't impact this one - val warmupDeadline = warmupTime.fromNow - while (!warmupDeadline.isOverdue) { + for (wi <- 0 until warmUpIterations) { f(new ToolsTimer(-1)) } val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters - val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos val runTimes = ArrayBuffer[Long]() var totalTime = 0L - var i = 0 - while (i < minIters || totalTime < minDuration) { + for (i <- 0 until minIters) { val timer = new ToolsTimer(i) f(timer) val runTime = timer.totalTime() @@ -139,13 +136,16 @@ class Benchmark( if (outputPerIteration) { // scalastyle:off + println("*"*80) println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds") + println("*"*80) // scalastyle:on } - i += 1 } // scalastyle:off - println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms") + println("*"*80) + println(s" Stopped after $minIters iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms") + println("*"*80) // scalastyle:on assert(runTimes.nonEmpty) val best = runTimes.min @@ -153,12 +153,12 @@ class Benchmark( val stdev = if (runTimes.size > 1) { math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) } else 0 - Benchmark.Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0) + Benchmark.Result(avg / 1000000.0, best / 1000000.0, stdev / 1000000.0) } } object Benchmark { case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int) - case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double) + case class Result(avgMs: Double, bestMs: Double, stdevMs: Double) } diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala new file mode 100644 index 000000000..a7c0e99ff --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rapids.tool.benchmarks + +import org.rogach.scallop.{ScallopConf, ScallopOption} + +class BenchmarkArgs(arguments: Seq[String]) extends ScallopConf(arguments) { + + banner(""" +Benchmarker class for running various benchmarks. + """) + + val iterations: ScallopOption[Int] = opt[Int](short = 'i', default = Some(5), + descr = "Total iterations to run excluding warmup (for avg time calculation)." + + " Default is 5 iterations", validate = _ > 0) + val warmupIterations: ScallopOption[Int] = opt[Int](short = 'w' , + default = Some(3), descr = "Total number of warmup iterations to run. Can take " + + "any input >=0. Warm up is important for benchmarking to ensure initial " + + "JVM operations do not skew the result ( classloading etc. )", validate = _ >= 0) + val outputDirectory: ScallopOption[String] = opt[String](short = 'o', + default = Some("."), descr = "Base output directory for benchmark results. " + + "Default is current directory. The final output will go into a subdirectory called" + + " rapids-tools-benchmark. It will override any directory with the same name") + val outputFormat: ScallopOption[String] = opt[String](short = 'f', + default = Some("text"), descr = "Output format for the benchmark results. For text" + + " the result output will be tabular. In case of json , the results" + + "will be JSON formatted. Currently supported formats are text, json") + val extraArgs: ScallopOption[String] = opt[String](short = 'a', + required = false, + descr = "Extra arguments to pass to the benchmark.These arguments will be sent to the " + + "benchmark class. The format is space separated arguments. For example " + + "--output-directory /tmp --per-sql /tmp/eventlogs") + verify() +} diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala index 400cdedc7..a2e52cd70 100644 --- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala +++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala @@ -31,7 +31,10 @@ abstract class BenchmarkBase { * Implementations of this method are supposed to use the wrapper method `runBenchmark` * for each benchmark scenario. */ - def runBenchmarkSuite(mainArgs: Array[String]): Unit + def runBenchmarkSuite(iterations: Int, + warmUpIterations: Int, + outputFormat: String, + mainArgs: Array[String]): Unit final def runBenchmark(benchmarkName: String)(func: => Any): Unit = { val separator = "=" * 96 @@ -49,8 +52,9 @@ abstract class BenchmarkBase { def afterAll(): Unit = {} def main(args: Array[String]): Unit = { - // TODO: get the dirRoot from the arguments instead - val dirRoot = "" + + val benchArgs = new BenchmarkArgs(args) + val dirRoot = benchArgs.outputDirectory().stripSuffix("/") val resultFileName = "results.txt" val dir = new File(s"$dirRoot/$prefix/") if (!dir.exists()) { @@ -61,7 +65,10 @@ abstract class BenchmarkBase { file.createNewFile() } output = Some(new FileOutputStream(file)) - runBenchmarkSuite(args) + runBenchmarkSuite(benchArgs.iterations(), + benchArgs.warmupIterations(), + benchArgs.outputFormat(), + benchArgs.extraArgs().split("\\s+").filter(_.nonEmpty)) output.foreach { o => if (o != null) { o.close() diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala new file mode 100644 index 000000000..17712739f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rapids.tool.benchmarks + +import com.nvidia.spark.rapids.tool.qualification.QualificationArgs +import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal + +/** + * This class is used to run the QualificationMain class as a benchmark. + * This can be used as a reference to write any benchmark class + * Usage - + * 1. Override the runBenchmarkSuite method + * 2. Write the benchmark code in the runBenchmark method passing relevant arguments + * 3. Write benchmarked code inside + */ +object QualificationBenchmark extends BenchmarkBase { + override def runBenchmarkSuite(iterations: Int, + warmUpIterations: Int, + outputFormat: String, + mainArgs: Array[String]): Unit = { + runBenchmark("QualificationBenchmark") { + val benchmarker = + new Benchmark( + "QualificationBenchmark", + 2, + output = output, + outputPerIteration = true, + warmUpIterations = warmUpIterations, + minNumIters = iterations) + benchmarker.addCase("QualificationBenchmark") { _ => + mainInternal(new QualificationArgs(mainArgs), + printStdout = true, enablePB = true) + } + benchmarker.run() + } + } +}