From 7b6803fbb3a7ce97008c97482335b26568fa5d10 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 22 Aug 2023 17:48:02 -0700 Subject: [PATCH] addressed review comments --- .../rapids/tool/qualification/Qualification.scala | 5 +++-- .../tool/qualification/QualificationArgs.scala | 5 +++++ .../tool/qualification/QualificationMain.scala | 3 ++- .../tool/qualification/QualificationAppInfo.scala | 15 +++++++++++---- .../tool/planparser/SqlPlanParserSuite.scala | 2 +- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index f7b624957..0fa13fddb 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -34,7 +34,8 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, timeout: Option[Long], nThreads: Int, order: String, pluginTypeChecker: PluginTypeChecker, reportReadSchema: Boolean, printStdout: Boolean, uiEnabled: Boolean, enablePB: Boolean, - reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean) extends Logging { + reportSqlLevel: Boolean, maxSQLDescLength: Int, mlOpsEnabled:Boolean, + ignoreTransitions: Boolean) extends Logging { private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]() @@ -166,7 +167,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration, try { val startTime = System.currentTimeMillis() val appResult = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker, - reportSqlLevel, mlOpsEnabled) + reportSqlLevel, mlOpsEnabled, ignoreTransitions) val qualAppResult = appResult match { case Left(errorMessage: String) => // Case when an error occurred during QualificationAppInfo creation diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 925001724..dda734f68 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -112,6 +112,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[Boolean](required = false, descr = "Whether to parse ML functions in the eventlogs. Default is false.", default = Some(false)) + val ignoreTransitions: ScallopOption[Boolean] = + opt[Boolean](required = false, + descr = "Whether to ignore durations for ColumnarToRow and RowToColumnar transitions " + + "in the eventlogs while calculating the speedup. Default is false.", + default = Some(false)) val sparkProperty: ScallopOption[List[String]] = opt[List[String]](required = false, descr = "Filter applications based on certain Spark properties that were set during " + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 0510b03f0..905a62972 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -60,6 +60,7 @@ object QualificationMain extends Logging { val reportSqlLevel = appArgs.perSql.getOrElse(false) val platform = appArgs.platform.getOrElse("onprem") val mlOpsEnabled = appArgs.mlFunctions.getOrElse(false) + val ignoreTransitions = appArgs.ignoreTransitions.getOrElse(false) val hadoopConf = RapidsToolsConfUtil.newHadoopConf @@ -93,7 +94,7 @@ object QualificationMain extends Logging { val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout, nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled, - enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled) + enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, ignoreTransitions) val res = qual.qualifyApps(filteredLogs) (0, res) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 4794150e9..468d34473 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -39,7 +39,8 @@ class QualificationAppInfo( pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, perSqlOnly: Boolean = false, - mlOpsEnabled: Boolean = false) + mlOpsEnabled: Boolean = false, + ignoreTransitions: Boolean = false) extends AppBase(eventLogInfo, hadoopConf) with Logging { var appId: String = "" @@ -262,8 +263,13 @@ class QualificationAppInfo( stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + val numTransitions = ignoreTransitions match { + case false => stageIdToGpuCpuTransitions.getOrElse(stageId, 0) + case true => 0 + } + // val numTransitions = stageIdToGpuCpuTransitions.getOrElse(stageId, 0) val transitionsTime = numTransitions match { + case 0 => 0L // no transitions case gpuCpuTransitions if gpuCpuTransitions > 0 => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be @@ -929,10 +935,11 @@ object QualificationAppInfo extends Logging { hadoopConf: Configuration, pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, - mlOpsEnabled: Boolean): Either[String, QualificationAppInfo] = { + mlOpsEnabled: Boolean, + ignoreTransitions: Boolean): Either[String, QualificationAppInfo] = { try { val app = new QualificationAppInfo(Some(path), Some(hadoopConf), pluginTypeChecker, - reportSqlLevel, false, mlOpsEnabled) + reportSqlLevel, false, mlOpsEnabled, ignoreTransitions) logInfo(s"${path.eventLog.toString} has App: ${app.appId}") Right(app) } catch { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index ec5877adb..99a5200a4 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -76,7 +76,7 @@ class SQLPlanParserSuite extends BaseTestSuite { val pluginTypeChecker = new PluginTypeChecker() assert(allEventLogs.size == 1) val appResult = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf, - pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false) + pluginTypeChecker, reportSqlLevel = false, mlOpsEnabled = false, ignoreTransitions = false) appResult match { case Right(app) => app case Left(_) => throw new AssertionError("Cannot create application")