Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark rapids tools 1120 fea args #11

Merged
merged 7 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.{OutputStream, PrintStream}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.{DurationInt, FiniteDuration, NANOSECONDS}
import scala.concurrent.duration.NANOSECONDS

import org.apache.commons.io.output.TeeOutputStream

Expand All @@ -39,9 +39,8 @@ import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer}
class Benchmark(
name: String,
valuesPerIteration: Long,
minNumIters: Int = 2,
warmupTime: FiniteDuration = 2.seconds,
minTime: FiniteDuration = 2.seconds,
minNumIters: Int,
warmUpIterations: Int,
outputPerIteration: Boolean = false,
output: Option[OutputStream] = None) {
import Benchmark._
Expand Down Expand Up @@ -88,9 +87,12 @@ class Benchmark(
def run(): Unit = {
require(benchmarks.nonEmpty)
// scalastyle:off
println("-" * 80)
println("Running benchmark: " + name)
println("-" * 80)
val results = benchmarks.map { c =>
println(" Running case: " + c.name)
println(" RUNNING CASE : " + c.name)
println("-" * 80)
measure(valuesPerIteration, c.numIters)(c.fn)
}
println
Expand All @@ -99,17 +101,15 @@ class Benchmark(
// The results are going to be processor specific so it is useful to include that.
out.println(RuntimeUtil.getJVMOSInfo.mkString("\n"))
val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", "Per Row(ns)", "Relative")
out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n",
name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Relative")
out.println("-" * (nameLen + 80))
results.zip(benchmarks).foreach { case (result, benchmark) =>
out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n",
benchmark.name,
"%5.0f" format result.bestMs,
"%4.0f" format result.avgMs,
"%5.0f" format result.stdevMs,
"%10.1f" format result.bestRate,
"%6.1f" format (1000 / result.bestRate),
"%3.1fX" format (firstBest / result.bestMs))
}
out.println()
Expand All @@ -121,16 +121,13 @@ class Benchmark(
*/
def measure(num: Long, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
val warmupDeadline = warmupTime.fromNow
while (!warmupDeadline.isOverdue) {
for (wi <- 0 until warmUpIterations) {
f(new ToolsTimer(-1))
}
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
val runTimes = ArrayBuffer[Long]()
var totalTime = 0L
var i = 0
while (i < minIters || totalTime < minDuration) {
for (i <- 0 until minIters) {
val timer = new ToolsTimer(i)
f(timer)
val runTime = timer.totalTime()
Expand All @@ -139,26 +136,29 @@ class Benchmark(

if (outputPerIteration) {
// scalastyle:off
println("*"*80)
println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds")
println("*"*80)
// scalastyle:on
}
i += 1
}
// scalastyle:off
println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
println("*"*80)
println(s" Stopped after $minIters iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
println("*"*80)
// scalastyle:on
assert(runTimes.nonEmpty)
val best = runTimes.min
val avg = runTimes.sum / runTimes.size
val stdev = if (runTimes.size > 1) {
math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1))
} else 0
Benchmark.Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0)
Benchmark.Result(avg / 1000000.0, best / 1000000.0, stdev / 1000000.0)
}
}


object Benchmark {
case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int)
case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double)
case class Result(avgMs: Double, bestMs: Double, stdevMs: Double)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids.tool.benchmarks
amahussein marked this conversation as resolved.
Show resolved Hide resolved

import org.rogach.scallop.{ScallopConf, ScallopOption}

class BenchmarkArgs(arguments: Seq[String]) extends ScallopConf(arguments) {

banner("""
Benchmarker class for running various benchmarks.
""")
amahussein marked this conversation as resolved.
Show resolved Hide resolved

val iterations: ScallopOption[Int] = opt[Int](short = 'i', default = Some(5),
descr = "Total iterations to run excluding warmup (for avg time calculation)." +
" Default is 5 iterations", validate = _ > 0)
val warmupIterations: ScallopOption[Int] = opt[Int](short = 'w' ,
default = Some(3), descr = "Total number of warmup iterations to run. Can take " +
"any input >=0. Warm up is important for benchmarking to ensure initial " +
"JVM operations do not skew the result ( classloading etc. )", validate = _ >= 0)
val outputDirectory: ScallopOption[String] = opt[String](short = 'o',
default = Some("."), descr = "Base output directory for benchmark results. " +
"Default is current directory. The final output will go into a subdirectory called" +
" rapids-tools-benchmark. It will override any directory with the same name")
val outputFormat: ScallopOption[String] = opt[String](short = 'f',
default = Some("text"), descr = "Output format for the benchmark results. For text" +
" the result output will be tabular. In case of json , the results" +
"will be JSON formatted. Currently supported formats are text, json")
val extraArgs: ScallopOption[String] = opt[String](short = 'a',
required = false,
descr = "Extra arguments to pass to the benchmark.These arguments will be sent to the " +
"benchmark class. The format is space separated arguments. For example " +
"--output-directory /tmp --per-sql /tmp/eventlogs")
verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ abstract class BenchmarkBase {
* Implementations of this method are supposed to use the wrapper method `runBenchmark`
* for each benchmark scenario.
*/
def runBenchmarkSuite(mainArgs: Array[String]): Unit
def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
mainArgs: Array[String]): Unit

final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
val separator = "=" * 96
Expand All @@ -49,8 +52,9 @@ abstract class BenchmarkBase {
def afterAll(): Unit = {}

def main(args: Array[String]): Unit = {
// TODO: get the dirRoot from the arguments instead
val dirRoot = ""

val benchArgs = new BenchmarkArgs(args)
val dirRoot = benchArgs.outputDirectory().stripSuffix("/")
val resultFileName = "results.txt"
val dir = new File(s"$dirRoot/$prefix/")
if (!dir.exists()) {
Expand All @@ -61,7 +65,10 @@ abstract class BenchmarkBase {
file.createNewFile()
}
output = Some(new FileOutputStream(file))
runBenchmarkSuite(args)
runBenchmarkSuite(benchArgs.iterations(),
benchArgs.warmupIterations(),
benchArgs.outputFormat(),
benchArgs.extraArgs().split("\\s+").filter(_.nonEmpty))
output.foreach { o =>
if (o != null) {
o.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids.tool.benchmarks
amahussein marked this conversation as resolved.
Show resolved Hide resolved

import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal

/**
* This class is used to run the QualificationMain class as a benchmark.
* This can be used as a reference to write any benchmark class
* Usage -
* 1. Override the runBenchmarkSuite method
* 2. Write the benchmark code in the runBenchmark method passing relevant arguments
* 3. Write benchmarked code inside
*/
object QualificationBenchmark extends BenchmarkBase {
amahussein marked this conversation as resolved.
Show resolved Hide resolved
amahussein marked this conversation as resolved.
Show resolved Hide resolved
override def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
mainArgs: Array[String]): Unit = {
runBenchmark("QualificationBenchmark") {
val benchmarker =
new Benchmark(
"QualificationBenchmark",
2,
output = output,
outputPerIteration = true,
warmUpIterations = warmUpIterations,
minNumIters = iterations)
benchmarker.addCase("QualificationBenchmark") { _ =>
mainInternal(new QualificationArgs(mainArgs),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am little bit confused here. If we initialize QualificationArgs on line 21, then what are those default arguments that are passed to the benchmarker variable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified post discussion. Benchmarker arguments control iterations and warm up iterations. The passed ffunction to the benchmarker is what is benchmarked internally

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will iterate on this later.

printStdout = true, enablePB = true)
}
benchmarker.run()
}
}
}