diff --git a/core/pom.xml b/core/pom.xml index feb055591..d71f1eb3e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -446,6 +446,8 @@ 3.11.0 ${java.version} ${java.version} + false + false ${platform-encoding} ${platform-encoding} ${platform-encoding} @@ -702,8 +704,10 @@ -feature -explaintypes -Yno-adapted-args - -Ywarn-unused:imports + -Ywarn-unused:imports,locals,patvars,privates -Xlint:missing-interpolator + -Xfatal-warnings + -Wconf:cat=lint-adapted-args:e -Xms1024m diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala index e9f24b8b4..12b980181 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala @@ -185,7 +185,7 @@ object EventLogPathProcessor extends Logging { }.toMap } } catch { - case fe: FileNotFoundException => + case _: FileNotFoundException => logWarning(s"$pathString not found, skipping!") Map.empty[EventLogInfo, Long] case e: Exception => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index 67a36e8b9..7fe47aaa0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -133,8 +133,8 @@ object PlatformFactory extends Logging { * * @param platformKey The key representing the desired platform. * @return An instance of the specified platform. - * @throws IllegalArgumentException if the specified platform key is not supported. */ + @throws[IllegalArgumentException] @tailrec def createInstance(platformKey: String = PlatformNames.DEFAULT): Platform = { platformKey match { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala index e1d76ba0b..446eb389c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/DataWritingCommandExecParser.scala @@ -57,13 +57,11 @@ object DataWritingCommandExecParser { val saveIntoDataSrcCMD = "SaveIntoDataSourceCommand" val insertIntoHadoopCMD = "InsertIntoHadoopFsRelationCommand" - // List of writeExecs that represent a physical command. + // Note: List of writeExecs that represent a physical command. // hardcode because InsertIntoHadoopFsRelationCommand uses this same exec // and InsertIntoHadoopFsRelationCommand doesn't have an entry in the - // supported execs file - private val physicalWriteCommands = Set( - defaultPhysicalCMD - ) + // supported execs file Set(defaultPhysicalCMD) + // A set of the logical commands that will be mapped to the physical write command // which has an entry in the speedupSheet @@ -76,13 +74,9 @@ object DataWritingCommandExecParser { saveIntoDataSrcCMD ) - // Defines a list of the execs that include formatted data. + // Note: Defines a list of the execs that include formatted data. // This will be used to extract the format and then check whether the - // format is supported or not. - private val formattedWriteCommands = Set( - dataWriteCMD, - insertIntoHadoopCMD - ) + // format is supported or not. Set(dataWriteCMD, insertIntoHadoopCMD) // For now, we map the SaveIntoDataSourceCommand to defaultPhysicalCMD because we do not // have speedup entry for the deltaLake write operation diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala index 02243ba37..62e63d3d2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala @@ -99,7 +99,7 @@ class Analysis(apps: Seq[ApplicationInfo]) { } } val allJobStageRows = apps.flatMap { app => - app.jobIdToInfo.flatMap { case (id, jc) => + app.jobIdToInfo.flatMap { case (_, jc) => val stageIdsInJob = jc.stageIds val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) => stageIdsInJob.contains(sid) @@ -107,7 +107,7 @@ class Analysis(apps: Seq[ApplicationInfo]) { if (stagesInJob.isEmpty) { None } else { - stagesInJob.map { case ((id, said), sc) => + stagesInJob.map { case ((id, _), sc) => val tasksInStage = app.taskEnd.filter { tc => tc.stageId == id } @@ -153,7 +153,7 @@ class Analysis(apps: Seq[ApplicationInfo]) { } // stages that are missing from a job, perhaps dropped events val stagesWithoutJobs = apps.flatMap { app => - val allStageinJobs = app.jobIdToInfo.flatMap { case (id, jc) => + val allStageinJobs = app.jobIdToInfo.flatMap { case (_, jc) => val stageIdsInJob = jc.stageIds app.stageIdToInfo.filterKeys { case (sid, _) => stageIdsInJob.contains(sid) @@ -224,7 +224,6 @@ class Analysis(apps: Seq[ApplicationInfo]) { } else { Seq.empty } - } // SQL Level TaskMetrics Aggregation(Only when SQL exists) @@ -232,7 +231,6 @@ class Analysis(apps: Seq[ApplicationInfo]) { val allRows = apps.flatMap { app => app.sqlIdToInfo.map { case (sqlId, sqlCase) => val jcs = app.jobIdToInfo.filter { case (_, jc) => - val jcid = jc.sqlID.getOrElse(-1) jc.sqlID.getOrElse(-1) == sqlId } if (jcs.isEmpty) { @@ -431,10 +429,6 @@ class Analysis(apps: Seq[ApplicationInfo]) { } } - val groupedTasks = tasksWithSkew.groupBy { tc => - (tc.stageId, tc.stageAttemptId) - } - tasksWithSkew.map { tc => val avgShuffleDur = avgsStageInfos.get((tc.stageId, tc.stageAttemptId)) avgShuffleDur match { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index 78a6cd5eb..6b577f437 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -147,8 +147,6 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { val rp = app.resourceProfIdToInfo.get(rpId) val execMem = rp.map(_.executorResources.get(ResourceProfile.MEMORY) .map(_.amount).getOrElse(0L)) - val execCores = rp.map(_.executorResources.get(ResourceProfile.CORES) - .map(_.amount).getOrElse(0L)) val execGpus = rp.map(_.executorResources.get("gpu") .map(_.amount).getOrElse(0L)) val taskCpus = rp.map(_.taskResources.get(ResourceProfile.CPUS) @@ -177,7 +175,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { // get job related information def getJobInfo: Seq[JobInfoProfileResult] = { val allRows = apps.flatMap { app => - app.jobIdToInfo.map { case (jobId, j) => + app.jobIdToInfo.map { case (_, j) => JobInfoProfileResult(app.index, j.jobID, j.stageIds, j.sqlID, j.startTime, j.endTime) } } @@ -270,7 +268,6 @@ object CollectInformation extends Logging { app.allSQLMetrics.map { metric => val sqlId = metric.sqlID val jobsForSql = app.jobIdToInfo.filter { case (_, jc) => - val jcid = jc.sqlID.getOrElse(-1) jc.sqlID.getOrElse(-1) == sqlId } val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSeq diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index a3fff6067..479d29784 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -102,7 +102,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea // combine them into single tables in the output. val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/combined", Profiler.COMBINED_LOG_FILE_NAME_PREFIX, numOutputRows, outputCSV = outputCSV) - val sums = createAppsAndSummarize(eventLogInfos, false, profileOutputWriter) + val sums = createAppsAndSummarize(eventLogInfos, profileOutputWriter) writeSafelyToOutput(profileOutputWriter, sums, outputCombined) profileOutputWriter.close() } @@ -159,7 +159,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea } private def createApps(allPaths: Seq[EventLogInfo]): Seq[ApplicationInfo] = { - var errorCodes = ArrayBuffer[Int]() val allApps = new ConcurrentLinkedQueue[ApplicationInfo]() class ProfileThread(path: EventLogInfo, index: Int) extends Runnable { @@ -201,9 +200,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea } private def createAppsAndSummarize(allPaths: Seq[EventLogInfo], - printPlans: Boolean, profileOutputWriter: ProfileOutputWriter): Seq[ApplicationSummaryInfo] = { - var errorCodes = ArrayBuffer[Int]() val allApps = new ConcurrentLinkedQueue[ApplicationSummaryInfo]() class ProfileThread(path: EventLogInfo, index: Int) extends Runnable { @@ -253,7 +250,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea private def createAppAndProcess( allPaths: Seq[EventLogInfo], - startIndex: Int = 1): Unit = { + startIndex: Int): Unit = { class ProfileProcessThread(path: EventLogInfo, index: Int) extends Runnable { def run: Unit = { try { @@ -309,7 +306,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea logInfo(s"Took ${endTime - startTime}ms to process ${path.eventLog.toString}") Some(app) } catch { - case json: com.fasterxml.jackson.core.JsonParseException => + case _: com.fasterxml.jackson.core.JsonParseException => logWarning(s"Error parsing JSON: $path") None case il: IllegalArgumentException => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index 0ff5bd614..3c16c502b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -37,9 +37,6 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(), speedupFactorFile: Option[String] = None) extends Logging { private val NS = "NS" - private val PS = "PS" - private val PSPART = "PS*" - private val SPART = "S*" // configured off private val CO = "CO" private val NA = "NA" @@ -231,7 +228,7 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(), case "float" => Seq("real") case "decimal" => Seq("dec", "numeric") case "calendar" => Seq("interval") - case other => Seq.empty[String] + case _ => Seq.empty[String] } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index bd1b91654..43ced9605 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -520,8 +520,8 @@ object QualOutputWriter { private def constructOutputRow( strAndSizes: Buffer[(String, Int)], - delimiter: String = TEXT_DELIMITER, - prettyPrint: Boolean = false): String = { + delimiter: String, + prettyPrint: Boolean): String = { val entireHeader = new StringBuffer if (prettyPrint) { entireHeader.append(delimiter) @@ -866,7 +866,7 @@ object QualOutputWriter { private def constructExecInfoBuffer( info: ExecInfo, appId: String, - delimiter: String = TEXT_DELIMITER, + delimiter: String, prettyPrint: Boolean, headersAndSizes: LinkedHashMap[String, Int], reformatCSV: Boolean = true): String = { @@ -1125,7 +1125,7 @@ object QualOutputWriter { private def constructDetailedAppInfoCSVRow( appInfo: FormattedQualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], - reportReadSchema: Boolean = false, + reportReadSchema: Boolean, reformatCSV: Boolean = true): ListBuffer[(String, Int)] = { val reformatCSVFunc : String => String = if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str) @@ -1198,7 +1198,7 @@ object QualOutputWriter { private def constructStatusReportInfo( statusInfo: StatusSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], - delimiter: String = TEXT_DELIMITER, + delimiter: String, prettyPrint: Boolean, reformatCSV: Boolean = true): Seq[String] = { val reformatCSVFunc: String => String = diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala index 42ecfbe6a..8789fa823 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala @@ -155,7 +155,8 @@ class RunningQualificationApp( val sqlInfo = aggregatePerSQLStats(sqlID) val csvResult = constructPerSqlResult(sqlInfo, QualOutputWriter.CSV_DELIMITER, false, escapeCSV = true) - val textResult = constructPerSqlResult(sqlInfo, QualOutputWriter.TEXT_DELIMITER, true) + val textResult = constructPerSqlResult(sqlInfo, QualOutputWriter.TEXT_DELIMITER, + prettyPrint = true) (csvResult, textResult) } @@ -176,8 +177,8 @@ class RunningQualificationApp( private def constructPerSqlResult( sqlInfo: Option[EstimatedPerSQLSummaryInfo], - delimiter: String = "|", - prettyPrint: Boolean = true, + delimiter: String, + prettyPrint: Boolean, sqlDescLength: Int = SQL_DESC_LENGTH, escapeCSV: Boolean = false): String = { sqlInfo match { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 527b7de58..7ffea6673 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -203,9 +203,9 @@ abstract class AppBase( val fs = eventLogPath.getFileSystem(hconf) var totalNumEvents = 0 val readerOpt = eventLog match { - case dblog: DatabricksEventLog => + case _: DatabricksEventLog => Some(new DatabricksRollingEventLogFilesFileReader(fs, eventLogPath)) - case apachelog => EventLogFileReader(fs, eventLogPath) + case _ => EventLogFileReader(fs, eventLogPath) } if (readerOpt.isDefined) { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index 02b7fe5b3..261875673 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -311,7 +311,7 @@ object AppFilterImpl { val timeInt = try { timeStr.toInt } catch { - case ne: NumberFormatException => + case _: NumberFormatException => throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " + "time must be greater than 0 and valid periods are min(minute),h(hours)" + ",d(days),w(weeks),m(months).") diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index 0b45817c5..b946cb2c5 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -330,7 +330,7 @@ class ApplicationInfo( } def aggregateSQLStageInfo: Seq[SQLStageInfoProfileResult] = { - val jobsWithSQL = jobIdToInfo.filter { case (id, j) => + val jobsWithSQL = jobIdToInfo.filter { case (_, j) => j.sqlID.nonEmpty } val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) => @@ -360,7 +360,7 @@ class ApplicationInfo( val res = this.appInfo val estimatedResult = this.appEndTime match { - case Some(t) => this.appEndTime + case Some(_) => this.appEndTime case None => val jobEndTimes = jobIdToInfo.map { case (_, jc) => jc.endTime }.filter(_.isDefined) val sqlEndTimes = sqlIdToInfo.map { case (_, sc) => sc.endTime }.filter(_.isDefined) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index a58d1e646..d1571b292 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -16,8 +16,6 @@ package org.apache.spark.sql.rapids.tool.qualification -import java.util.concurrent.TimeUnit - import scala.collection.mutable.{ArrayBuffer, HashMap} import com.nvidia.spark.rapids.tool.EventLogInfo @@ -31,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.execution.SparkPlanInfo import org.apache.spark.sql.execution.ui.SparkPlanGraph -import org.apache.spark.sql.rapids.tool.{AppBase, GpuEventLogException, IgnoreExecs, SupportedMLFuncsName, ToolUtils} +import org.apache.spark.sql.rapids.tool.{AppBase, GpuEventLogException, SupportedMLFuncsName, ToolUtils} class QualificationAppInfo( eventLogInfo: Option[EventLogInfo], @@ -106,7 +104,7 @@ class QualificationAppInfo( if (startTime > 0) { val estimatedResult = this.appEndTime match { - case Some(t) => this.appEndTime + case Some(_) => this.appEndTime case None => if (lastSQLEndTime.isEmpty && lastJobEndTime.isEmpty) { None @@ -312,7 +310,7 @@ class QualificationAppInfo( } val transitionsTime = numTransitions match { case 0 => 0L // no transitions - case gpuCpuTransitions => + case _ => // Duration to transfer data from GPU to CPU and vice versa. // Assuming it's a PCI-E Gen3, but also assuming that some of the result could be // spilled to disk. @@ -323,13 +321,11 @@ class QualificationAppInfo( } if (totalBytesRead > 0) { val transitionTime = (totalBytesRead / - QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * gpuCpuTransitions + QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * numTransitions (transitionTime * 1000).toLong // convert to milliseconds } else { 0L } - - case _ => 0L } val finalEachStageUnsupported = if (transitionsTime != 0) { // Add 50% penalty for unsupported duration if there are transitions. This number diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/RunningQualificationEventProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/RunningQualificationEventProcessor.scala index 850a918d0..f269face9 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/RunningQualificationEventProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/RunningQualificationEventProcessor.scala @@ -241,7 +241,7 @@ class RunningQualificationEventProcessor(sparkConf: SparkConf) extends SparkList listener.onOtherEvent(event) event match { case e: SparkListenerSQLExecutionStart => - logDebug("Starting new SQL query") + logDebug(s"Starting new SQL query: ${e.executionId}") case e: SparkListenerSQLExecutionEnd => writeSQLDetails(e.executionId) case _ => diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala index 46187da32..bdcae9aa5 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala @@ -34,8 +34,8 @@ object EventUtils extends Logging { * * @param data value stored in the (value/update) of the AccumulableInfo * @return valid parsed long of the content or the duration - * @throws java.lang.NullPointerException if the argument is `null` */ + @throws[NullPointerException] def parseAccumFieldToLong(data: Any): Option[Long] = { val strData = data.toString try { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala index 5d9d09c64..8214b3de6 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/ToolTestUtils.scala @@ -137,7 +137,7 @@ object ToolTestUtils extends Logging { def processProfileApps(logs: Array[String], sparkSession: SparkSession): ArrayBuffer[ApplicationInfo] = { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(logs) var index: Int = 1 for (path <- appArgs.eventlog()) { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala index f8d372995..0d8806237 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/planparser/SqlPlanParserSuite.scala @@ -59,7 +59,6 @@ class SQLPlanParserSuite extends BaseTestSuite { assert(t.forall(_.children.isEmpty), s"$extraText $t") if (expectedDur.nonEmpty) { val durations = t.map(_.duration) - val foo = durations.diff(expectedDur) assert(durations.diff(expectedDur).isEmpty, s"$extraText durations differ expected ${expectedDur.mkString(",")} " + s"but got ${durations.mkString(",")}") @@ -233,13 +232,11 @@ class SQLPlanParserSuite extends BaseTestSuite { val planInfo = SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app) val allExecInfo = planInfo.execInfo - val expectedAllExecInfoSize = if (ToolUtils.isSpark320OrLater()) { - // AdaptiveSparkPlan, WholeStageCodegen, AQEShuffleRead, Exchange, WholeStageCodegen - 5 - } else { - // WholeStageCodegen, Exchange, WholeStageCodegen - 3 - } + // Note that: + // Spark320+ will generate the following execs: + // AdaptiveSparkPlan, WholeStageCodegen, AQEShuffleRead, Exchange, WholeStageCodegen + // Other Sparks: + // WholeStageCodegen, Exchange, WholeStageCodegen val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen")) assert(wholeStages.size == 2) // Expanding the children of WholeStageCodegen diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index f3d382e56..dca5cff9a 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -109,7 +109,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test rapids jar") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rapids_join_eventlog.zstd")) var index: Int = 1 @@ -175,7 +175,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test printSQLPlanMetrics") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rapids_join_eventlog.zstd")) var index: Int = 1 @@ -203,7 +203,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { test("test printSQLPlans") { TrampolineUtil.withTempDir { tempOutputDir => - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rapids_join_eventlog.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -222,8 +222,8 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test read GPU datasourcev1") { - TrampolineUtil.withTempDir { tempOutputDir => - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + TrampolineUtil.withTempDir { _ => + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/eventlog-gpu-dsv1.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -260,8 +260,8 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test read GPU datasourcev2") { - TrampolineUtil.withTempDir { tempOutputDir => - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + TrampolineUtil.withTempDir { _ => + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/eventlog-gpu-dsv2.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -294,8 +294,8 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test read datasourcev1") { - TrampolineUtil.withTempDir { tempOutputDir => - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + TrampolineUtil.withTempDir { _ => + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/eventlog_dsv1.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -328,8 +328,8 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test read datasourcev2") { - TrampolineUtil.withTempDir { tempOutputDir => - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + TrampolineUtil.withTempDir { _ => + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/eventlog_dsv2.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -365,8 +365,8 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test IOMetrics") { - TrampolineUtil.withTempDir { tempOutputDir => - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + TrampolineUtil.withTempDir { _ => + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/eventlog-gpu-dsv1.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -393,8 +393,8 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test jdbc read") { - TrampolineUtil.withTempDir { tempOutputDir => - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + TrampolineUtil.withTempDir { _ => + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$qualLogDir/jdbc_eventlog.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -419,7 +419,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test printJobInfo") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rp_sql_eventlog.zstd")) var index: Int = 1 @@ -450,7 +450,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test sqlToStages") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rp_sql_eventlog.zstd")) var index: Int = 1 @@ -474,7 +474,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test wholeStage mapping") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rp_sql_eventlog.zstd")) var index: Int = 1 @@ -505,7 +505,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { test("test multiple resource profile in single app") { - var apps :ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps :ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rp_nosql_eventlog")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() @@ -533,7 +533,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test spark2 and spark3 event logs") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/tasks_executors_fail_compressed_eventlog.zstd", s"$logDir/spark2-eventlog.zstd")) var index: Int = 1 @@ -693,7 +693,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } assert(apps.size == 1) val collect = new CollectInformation(apps) - for (app <- apps) { + for (_ <- apps) { val rapidsProps = collect.getProperties(rapidsOnly = true) val rows = rapidsProps.map(_.rows.head) assert(rows.length == 5) // 5 properties captured. @@ -711,7 +711,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test executor info local mode") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/spark2-eventlog.zstd")) var index: Int = 1 @@ -731,7 +731,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { } test("test executor info cluster mode") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/tasks_executors_fail_compressed_eventlog.zstd")) var index: Int = 1 @@ -866,7 +866,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { test("test collectionAccumulator") { TrampolineUtil.withTempDir { eventLogDir => - val (eventLog, appId) = ToolTestUtils.generateEventLog(eventLogDir, "collectaccum") { spark => + val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "collectaccum") { spark => val a = spark.sparkContext.collectionAccumulator[Long]("testCollect") val rdd = spark.sparkContext.parallelize(Array(1, 2, 3, 4)) // run something to add it to collectionAccumulator @@ -881,7 +881,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { collectFileDir.getAbsolutePath, eventLog)) - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() var index: Int = 1 val eventlogPaths = appArgs.eventlog() for (path <- eventlogPaths) { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/CompareSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/CompareSuite.scala index a8b9e84f3..36eb467ce 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/CompareSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/CompareSuite.scala @@ -31,7 +31,7 @@ class CompareSuite extends FunSuite { private val logDir = ToolTestUtils.getTestResourcePath("spark-events-profiling") test("test spark2 and spark3 event logs compare") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/tasks_executors_fail_compressed_eventlog.zstd", s"$logDir/spark2-eventlog.zstd")) var index: Int = 1 @@ -54,7 +54,7 @@ class CompareSuite extends FunSuite { } test("test 2 app runs event logs compare") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/rapids_join_eventlog2.zstd", s"$logDir/rapids_join_eventlog.zstd")) var index: Int = 1 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/GenerateTimelineSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/GenerateTimelineSuite.scala index 4ca730dda..ef662ef35 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/GenerateTimelineSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/GenerateTimelineSuite.scala @@ -41,7 +41,7 @@ class GenerateTimelineSuite extends FunSuite with BeforeAndAfterAll with Logging } // create new session for tool to use - val spark2 = SparkSession + val _ = SparkSession .builder() .master("local[*]") .appName("Rapids Spark Profiling Tool Unit Tests") diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala index 8083661c1..c90f35065 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala @@ -42,7 +42,7 @@ class HealthCheckSuite extends FunSuite { private val logDir = ToolTestUtils.getTestResourcePath("spark-events-profiling") test("test task-stage-job-failures") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/tasks_executors_fail_compressed_eventlog.zstd")) var index: Int = 1 @@ -90,7 +90,7 @@ class HealthCheckSuite extends FunSuite { } test("test blockManager_executors_failures") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/tasks_executors_fail_compressed_eventlog.zstd")) var index: Int = 1 @@ -123,7 +123,7 @@ class HealthCheckSuite extends FunSuite { } test("test unSupportedSQLPlan") { - var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = new ProfileArgs(Array(s"$logDir/dataset_eventlog")) var index: Int = 1 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/QualificationInfoUtils.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/QualificationInfoUtils.scala index c97891c09..ffcf4c2a5 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/QualificationInfoUtils.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/QualificationInfoUtils.scala @@ -128,7 +128,6 @@ object QualificationInfoUtils extends Logging { )).toDF("id", "name", "city", "country") userData.write.json(jsonInputFile.getCanonicalPath) val userDataRead = spark.read.json(jsonInputFile.getCanonicalPath) - val allUSA = Seq("US", "USa", "USA", "United states", "United states of America") userDataRead.createOrReplaceTempView("user_data") val cleanCountryUdf = udf(cleanCountry) val resDf = userDataRead.withColumn("normalisedCountry", cleanCountryUdf(col("country"))) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala index 9a3640986..539ae9b4f 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala @@ -46,7 +46,6 @@ class PluginTypeCheckerSuite extends FunSuite with Logging { test("invalid file") { val checker = new PluginTypeChecker TrampolineUtil.withTempDir { outpath => - val testSchema = "loan_id:boolean,monthly_reporting_period:string,servicer:string" val header = "Format,Direction,BOOLEAN\n" // text longer then header should throw val supText = (header + "parquet,read,NS,NS\n").getBytes(StandardCharsets.UTF_8) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 447c203c9..ca13bd7e4 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -401,7 +401,7 @@ class QualificationSuite extends BaseTestSuite { "--report-read-schema") val appArgs = new QualificationArgs(allArgs ++ logFiles) - val (exit, sum) = QualificationMain.mainInternal(appArgs) + val (exit, sum@_) = QualificationMain.mainInternal(appArgs) assert(exit == 0) val filename = s"$outpath/rapids_4_spark_qualification_output/" + @@ -738,7 +738,6 @@ class QualificationSuite extends BaseTestSuite { TrampolineUtil.withTempDir { outpath => TrampolineUtil.withTempDir { eventLogDir => val tmpParquet = s"$outpath/mlOpsParquet" - val sparkVersion = ToolUtils.sparkRuntimeVersion createDecFile(sparkSession, tmpParquet) val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "dot") { spark => val data = Array( @@ -747,7 +746,7 @@ class QualificationSuite extends BaseTestSuite { Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) ) val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") - val pca = new PCA() + new PCA() .setInputCol("features") .setOutputCol("pcaFeatures") .setK(3) @@ -1042,7 +1041,6 @@ class QualificationSuite extends BaseTestSuite { assert(detailedOut.nonEmpty) assert(detailedOut.startsWith("|") && detailedOut.endsWith("|\n")) val stdOut = sumOut.split("\n") - val stdOutHeader = stdOut(0).split("\\|") val stdOutValues = stdOut(1).split("\\|") // index of unsupportedExecs val stdOutunsupportedExecs = stdOutValues(stdOutValues.length - 3) @@ -1064,7 +1062,7 @@ class QualificationSuite extends BaseTestSuite { outpath.getAbsolutePath, eventLog)) - val (exit, sumInfo) = QualificationMain.mainInternal(appArgs) + val (exit, sumInfo@_) = QualificationMain.mainInternal(appArgs) assert(exit == 0) // the code above that runs the Spark query stops the Sparksession @@ -1119,7 +1117,7 @@ class QualificationSuite extends BaseTestSuite { outpath.getAbsolutePath, eventLog)) - val (exit, sumInfo) = QualificationMain.mainInternal(appArgs) + val (exit, sumInfo@_) = QualificationMain.mainInternal(appArgs) assert(exit == 0) // the code above that runs the Spark query stops the Sparksession @@ -1206,7 +1204,7 @@ class QualificationSuite extends BaseTestSuite { "--output-directory", outpath.getAbsolutePath()) val appArgs = new QualificationArgs(allArgs ++ Array(eventLog)) - val (exit, appSum) = QualificationMain.mainInternal(appArgs) + val (exit, appSum@_) = QualificationMain.mainInternal(appArgs) assert(exit == 0) val filename = s"$outpath/rapids_4_spark_qualification_output/" + @@ -1333,7 +1331,7 @@ class QualificationSuite extends BaseTestSuite { outpath.getAbsolutePath, eventLog)) - val (exit, sumInfo) = + val (exit, sumInfo@_) = QualificationMain.mainInternal(appArgs) assert(exit == 0) @@ -1459,7 +1457,7 @@ class QualificationSuite extends BaseTestSuite { outpath.getAbsolutePath, eventLog)) - val (exit, sumInfo) = QualificationMain.mainInternal(appArgs) + val (exit, sumInfo@_) = QualificationMain.mainInternal(appArgs) assert(exit == 0) // the code above that runs the Spark query stops the Sparksession