-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding GC Metrics #12
Changes from 4 commits
f557976
4eb9997
f1664df
58a7d9f
94f7f93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ import scala.concurrent.duration.NANOSECONDS | |
|
||
import org.apache.commons.io.output.TeeOutputStream | ||
|
||
import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer} | ||
import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, RuntimeUtil, ToolsTimer} | ||
|
||
/** | ||
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase | ||
|
@@ -37,7 +37,7 @@ import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer} | |
* This will output the average time to run each function and the rate of each function. | ||
*/ | ||
class Benchmark( | ||
name: String, | ||
name: String = "Benchmarker", | ||
valuesPerIteration: Long, | ||
minNumIters: Int, | ||
warmUpIterations: Int, | ||
|
@@ -99,18 +99,31 @@ class Benchmark( | |
|
||
val firstBest = results.head.bestMs | ||
// The results are going to be processor specific so it is useful to include that. | ||
out.println(RuntimeUtil.getJVMOSInfo.mkString("\n")) | ||
val jvmInfo = RuntimeUtil.getJVMOSInfo | ||
out.printf(s"%-26s -> %-30s \n","JVM Name", jvmInfo("jvm.name")) | ||
out.printf(s"%-26s -> %-30s \n","Java Version",jvmInfo("jvm.version")) | ||
out.printf(s"%-26s -> %-30s \n","OS Name",jvmInfo("os.name")) | ||
out.printf(s"%-26s -> %-30s \n","OS Version",jvmInfo("os.version")) | ||
out.printf(s"%-26s -> %-30s \n","MaxHeapMemory", ("%6d" format Runtime.getRuntime.maxMemory()/1024/1024)+"MB") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Corrected |
||
out.printf(s"%-26s -> %-30s \n","Total Warm Up Iterations","%2d" format warmUpIterations) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Corrected |
||
out.printf(s"%-26s -> %-30s \n \n","Total Runtime Iterations","%2d" format minNumIters) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Corrected |
||
val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max)) | ||
out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n", | ||
name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Relative") | ||
out.println("-" * (nameLen + 80)) | ||
out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n", | ||
name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)","Avg GC Time(ms)", | ||
"Avg GC Count", "Stdev GC Count","Max GC Time(ms)","Max GC Count", "Relative") | ||
out.println("-" * (nameLen + 160)) | ||
results.zip(benchmarks).foreach { case (result, benchmark) => | ||
out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n", | ||
out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n", | ||
benchmark.name, | ||
"%5.0f" format result.bestMs, | ||
"%4.0f" format result.avgMs, | ||
"%5.0f" format result.stdevMs, | ||
"%3.1fX" format (firstBest / result.bestMs)) | ||
"%5.1f" format result.memoryParams.avgGCTime, | ||
"%5.1f" format result.memoryParams.avgGCCount, | ||
"%5.0f" format result.memoryParams.stdDevGCCount, | ||
"%5d" format result.memoryParams.maxGcTime, | ||
"%5d" format result.memoryParams.maxGCCount, | ||
"%3.2fX" format (firstBest / result.bestMs)) | ||
} | ||
out.println() | ||
} | ||
|
@@ -126,14 +139,17 @@ class Benchmark( | |
} | ||
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters | ||
val runTimes = ArrayBuffer[Long]() | ||
var totalTime = 0L | ||
val gcCounts = ArrayBuffer[Long]() | ||
val gcTimes = ArrayBuffer[Long]() | ||
//For tracking maximum GC over iterations | ||
for (i <- 0 until minIters) { | ||
val timer = new ToolsTimer(i) | ||
val memoryTracker = new MemoryMetricsTracker | ||
f(timer) | ||
val runTime = timer.totalTime() | ||
runTimes += runTime | ||
totalTime += runTime | ||
|
||
gcCounts += memoryTracker.getTotalGCCount | ||
gcTimes += memoryTracker.getTotalGCTime | ||
if (outputPerIteration) { | ||
// scalastyle:off | ||
println("*"*80) | ||
|
@@ -148,17 +164,34 @@ class Benchmark( | |
println("*"*80) | ||
// scalastyle:on | ||
assert(runTimes.nonEmpty) | ||
val best = runTimes.min | ||
val avg = runTimes.sum / runTimes.size | ||
val stdev = if (runTimes.size > 1) { | ||
math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) | ||
} else 0 | ||
Benchmark.Result(avg / 1000000.0, best / 1000000.0, stdev / 1000000.0) | ||
val bestRuntime = runTimes.min | ||
val avgRuntime = runTimes.sum / runTimes.size | ||
val stdevRunTime = if (runTimes.size > 1) { | ||
math.sqrt(runTimes.map(time => (time - avgRuntime) * | ||
(time - avgRuntime)).sum / (runTimes.size - 1)) | ||
} else { | ||
0 | ||
} | ||
val maxGcCount = gcCounts.max | ||
val stdevGcCount = if (gcCounts.size > 1) { | ||
math.sqrt(gcCounts.map(gc => (gc - maxGcCount) * | ||
(gc - maxGcCount)).sum / (gcCounts.size - 1)) | ||
} else { | ||
0 | ||
} | ||
val avgGcCount = gcCounts.sum / minIters | ||
val avgGcTime = gcTimes.sum / minIters | ||
val maxGcTime = gcTimes.max | ||
Benchmark.Result(avgRuntime / 1000000.0, bestRuntime / 1000000.0, stdevRunTime / 1000000.0, | ||
JVMMemoryParams(avgGcTime, avgGcCount, stdevGcCount, maxGcCount, maxGcTime)) | ||
} | ||
} | ||
|
||
|
||
object Benchmark { | ||
case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int) | ||
case class Result(avgMs: Double, bestMs: Double, stdevMs: Double) | ||
case class JVMMemoryParams( avgGCTime:Double, avgGCCount:Double, | ||
stdDevGCCount: Double, maxGCCount: Long, maxGcTime:Long) | ||
case class Result(avgMs: Double, bestMs: Double, stdevMs: Double, | ||
memoryParams: JVMMemoryParams) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,23 +27,28 @@ import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal | |
* 2. Write the benchmark code in the runBenchmark method passing relevant arguments | ||
* 3. Write benchmarked code inside | ||
*/ | ||
object QualificationBenchmark extends BenchmarkBase { | ||
object QualificationToolBenchmark extends BenchmarkBase { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can call this SingleThreadedQualTool and set the number of threads to be 1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed |
||
override def runBenchmarkSuite(iterations: Int, | ||
warmUpIterations: Int, | ||
outputFormat: String, | ||
mainArgs: Array[String]): Unit = { | ||
runBenchmark("QualificationBenchmark") { | ||
runBenchmark("Benchmark_Per_SQL_Arg_Qualification") { | ||
val benchmarker = | ||
new Benchmark( | ||
"QualificationBenchmark", | ||
2, | ||
valuesPerIteration = 2, | ||
output = output, | ||
outputPerIteration = true, | ||
warmUpIterations = warmUpIterations, | ||
minNumIters = iterations) | ||
benchmarker.addCase("QualificationBenchmark") { _ => | ||
benchmarker.addCase("Enable_Per_SQL_Arg_Qualification") { _ => | ||
val (prefix,suffix) = mainArgs.splitAt(mainArgs.length - 1) | ||
|
||
mainInternal(new QualificationArgs(prefix :+ "--per-sql" :+ suffix.head), | ||
enablePB = true) | ||
} | ||
benchmarker.addCase("Disable_Per_SQL_Arg_Qualification") { _ => | ||
mainInternal(new QualificationArgs(mainArgs), | ||
printStdout = true, enablePB = true) | ||
enablePB = true) | ||
} | ||
benchmarker.run() | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package org.apache.spark.sql.rapids.tool.util | ||
|
||
import java.lang.management.ManagementFactory | ||
|
||
import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` | ||
|
||
/** | ||
* Utility class to track memory metrics. | ||
* This class is used to track memory metrics such as GC count, GC time, | ||
* heap memory usage, etc. | ||
* | ||
*/ | ||
class MemoryMetricsTracker { | ||
private val startGCMetrics = getCurrentGCMetrics | ||
|
||
private def getCurrentGCMetrics: (Long, Long) = { | ||
val gcBeans = ManagementFactory.getGarbageCollectorMXBeans | ||
|
||
(gcBeans.map(_.getCollectionCount).sum, | ||
gcBeans.map(_.getCollectionTime).sum) | ||
} | ||
|
||
def getTotalGCCount: Long = { | ||
val (newGcCount:Long, _) = getCurrentGCMetrics | ||
newGcCount - startGCMetrics._1 | ||
} | ||
|
||
def getTotalGCTime: Long = { | ||
val (_, newGcTime:Long) = getCurrentGCMetrics | ||
newGcTime - startGCMetrics._2 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make it more standard
:
instead of->
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected