Skip to content

Commit

Permalink
Update pom to fail on warnings (#701)
Browse files Browse the repository at this point in the history
Update pom to fail on warnings

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein authored Dec 26, 2023
1 parent 217c608 commit 03effc2
Show file tree
Hide file tree
Showing 25 changed files with 84 additions and 111 deletions.
6 changes: 5 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.scaladoc.skip>false</maven.scaladoc.skip>
<maven.scalastyle.skip>false</maven.scalastyle.skip>
<project.build.sourceEncoding>${platform-encoding}</project.build.sourceEncoding>
<project.reporting.sourceEncoding>${platform-encoding}</project.reporting.sourceEncoding>
<project.reporting.outputEncoding>${platform-encoding}</project.reporting.outputEncoding>
Expand Down Expand Up @@ -702,8 +704,10 @@
<arg>-feature</arg>
<arg>-explaintypes</arg>
<arg>-Yno-adapted-args</arg>
<arg>-Ywarn-unused:imports</arg>
<arg>-Ywarn-unused:imports,locals,patvars,privates</arg>
<arg>-Xlint:missing-interpolator</arg>
<arg>-Xfatal-warnings</arg>
<arg>-Wconf:cat=lint-adapted-args:e</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ object EventLogPathProcessor extends Logging {
}.toMap
}
} catch {
case fe: FileNotFoundException =>
case _: FileNotFoundException =>
logWarning(s"$pathString not found, skipping!")
Map.empty[EventLogInfo, Long]
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ object PlatformFactory extends Logging {
*
* @param platformKey The key representing the desired platform.
* @return An instance of the specified platform.
* @throws IllegalArgumentException if the specified platform key is not supported.
*/
@throws[IllegalArgumentException]
@tailrec
def createInstance(platformKey: String = PlatformNames.DEFAULT): Platform = {
platformKey match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ object DataWritingCommandExecParser {
val saveIntoDataSrcCMD = "SaveIntoDataSourceCommand"
val insertIntoHadoopCMD = "InsertIntoHadoopFsRelationCommand"

// List of writeExecs that represent a physical command.
// Note: List of writeExecs that represent a physical command.
// hardcode because InsertIntoHadoopFsRelationCommand uses this same exec
// and InsertIntoHadoopFsRelationCommand doesn't have an entry in the
// supported execs file
private val physicalWriteCommands = Set(
defaultPhysicalCMD
)
// supported execs file Set(defaultPhysicalCMD)


// A set of the logical commands that will be mapped to the physical write command
// which has an entry in the speedupSheet
Expand All @@ -76,13 +74,9 @@ object DataWritingCommandExecParser {
saveIntoDataSrcCMD
)

// Defines a list of the execs that include formatted data.
// Note: Defines a list of the execs that include formatted data.
// This will be used to extract the format and then check whether the
// format is supported or not.
private val formattedWriteCommands = Set(
dataWriteCMD,
insertIntoHadoopCMD
)
// format is supported or not. Set(dataWriteCMD, insertIntoHadoopCMD)

// For now, we map the SaveIntoDataSourceCommand to defaultPhysicalCMD because we do not
// have speedup entry for the deltaLake write operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}
val allJobStageRows = apps.flatMap { app =>
app.jobIdToInfo.flatMap { case (id, jc) =>
app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
}
if (stagesInJob.isEmpty) {
None
} else {
stagesInJob.map { case ((id, said), sc) =>
stagesInJob.map { case ((id, _), sc) =>
val tasksInStage = app.taskEnd.filter { tc =>
tc.stageId == id
}
Expand Down Expand Up @@ -153,7 +153,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
// stages that are missing from a job, perhaps dropped events
val stagesWithoutJobs = apps.flatMap { app =>
val allStageinJobs = app.jobIdToInfo.flatMap { case (id, jc) =>
val allStageinJobs = app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
Expand Down Expand Up @@ -224,15 +224,13 @@ class Analysis(apps: Seq[ApplicationInfo]) {
} else {
Seq.empty
}

}

// SQL Level TaskMetrics Aggregation(Only when SQL exists)
def sqlMetricsAggregation(): Seq[SQLTaskAggMetricsProfileResult] = {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
val jcid = jc.sqlID.getOrElse(-1)
jc.sqlID.getOrElse(-1) == sqlId
}
if (jcs.isEmpty) {
Expand Down Expand Up @@ -431,10 +429,6 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}

val groupedTasks = tasksWithSkew.groupBy { tc =>
(tc.stageId, tc.stageAttemptId)
}

tasksWithSkew.map { tc =>
val avgShuffleDur = avgsStageInfos.get((tc.stageId, tc.stageAttemptId))
avgShuffleDur match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
val rp = app.resourceProfIdToInfo.get(rpId)
val execMem = rp.map(_.executorResources.get(ResourceProfile.MEMORY)
.map(_.amount).getOrElse(0L))
val execCores = rp.map(_.executorResources.get(ResourceProfile.CORES)
.map(_.amount).getOrElse(0L))
val execGpus = rp.map(_.executorResources.get("gpu")
.map(_.amount).getOrElse(0L))
val taskCpus = rp.map(_.taskResources.get(ResourceProfile.CPUS)
Expand Down Expand Up @@ -177,7 +175,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
// get job related information
def getJobInfo: Seq[JobInfoProfileResult] = {
val allRows = apps.flatMap { app =>
app.jobIdToInfo.map { case (jobId, j) =>
app.jobIdToInfo.map { case (_, j) =>
JobInfoProfileResult(app.index, j.jobID, j.stageIds, j.sqlID, j.startTime, j.endTime)
}
}
Expand Down Expand Up @@ -270,7 +268,6 @@ object CollectInformation extends Logging {
app.allSQLMetrics.map { metric =>
val sqlId = metric.sqlID
val jobsForSql = app.jobIdToInfo.filter { case (_, jc) =>
val jcid = jc.sqlID.getOrElse(-1)
jc.sqlID.getOrElse(-1) == sqlId
}
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
// combine them into single tables in the output.
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/combined",
Profiler.COMBINED_LOG_FILE_NAME_PREFIX, numOutputRows, outputCSV = outputCSV)
val sums = createAppsAndSummarize(eventLogInfos, false, profileOutputWriter)
val sums = createAppsAndSummarize(eventLogInfos, profileOutputWriter)
writeSafelyToOutput(profileOutputWriter, sums, outputCombined)
profileOutputWriter.close()
}
Expand Down Expand Up @@ -159,7 +159,6 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
}

private def createApps(allPaths: Seq[EventLogInfo]): Seq[ApplicationInfo] = {
var errorCodes = ArrayBuffer[Int]()
val allApps = new ConcurrentLinkedQueue[ApplicationInfo]()

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
Expand Down Expand Up @@ -201,9 +200,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
}

private def createAppsAndSummarize(allPaths: Seq[EventLogInfo],
printPlans: Boolean,
profileOutputWriter: ProfileOutputWriter): Seq[ApplicationSummaryInfo] = {
var errorCodes = ArrayBuffer[Int]()
val allApps = new ConcurrentLinkedQueue[ApplicationSummaryInfo]()

class ProfileThread(path: EventLogInfo, index: Int) extends Runnable {
Expand Down Expand Up @@ -253,7 +250,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea

private def createAppAndProcess(
allPaths: Seq[EventLogInfo],
startIndex: Int = 1): Unit = {
startIndex: Int): Unit = {
class ProfileProcessThread(path: EventLogInfo, index: Int) extends Runnable {
def run: Unit = {
try {
Expand Down Expand Up @@ -309,7 +306,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
logInfo(s"Took ${endTime - startTime}ms to process ${path.eventLog.toString}")
Some(app)
} catch {
case json: com.fasterxml.jackson.core.JsonParseException =>
case _: com.fasterxml.jackson.core.JsonParseException =>
logWarning(s"Error parsing JSON: $path")
None
case il: IllegalArgumentException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(),
speedupFactorFile: Option[String] = None) extends Logging {

private val NS = "NS"
private val PS = "PS"
private val PSPART = "PS*"
private val SPART = "S*"
// configured off
private val CO = "CO"
private val NA = "NA"
Expand Down Expand Up @@ -231,7 +228,7 @@ class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(),
case "float" => Seq("real")
case "decimal" => Seq("dec", "numeric")
case "calendar" => Seq("interval")
case other => Seq.empty[String]
case _ => Seq.empty[String]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ object QualOutputWriter {

private def constructOutputRow(
strAndSizes: Buffer[(String, Int)],
delimiter: String = TEXT_DELIMITER,
prettyPrint: Boolean = false): String = {
delimiter: String,
prettyPrint: Boolean): String = {
val entireHeader = new StringBuffer
if (prettyPrint) {
entireHeader.append(delimiter)
Expand Down Expand Up @@ -866,7 +866,7 @@ object QualOutputWriter {
private def constructExecInfoBuffer(
info: ExecInfo,
appId: String,
delimiter: String = TEXT_DELIMITER,
delimiter: String,
prettyPrint: Boolean,
headersAndSizes: LinkedHashMap[String, Int],
reformatCSV: Boolean = true): String = {
Expand Down Expand Up @@ -1125,7 +1125,7 @@ object QualOutputWriter {
private def constructDetailedAppInfoCSVRow(
appInfo: FormattedQualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
reportReadSchema: Boolean = false,
reportReadSchema: Boolean,
reformatCSV: Boolean = true): ListBuffer[(String, Int)] = {
val reformatCSVFunc : String => String =
if (reformatCSV) str => StringUtils.reformatCSVString(str) else str => stringIfempty(str)
Expand Down Expand Up @@ -1198,7 +1198,7 @@ object QualOutputWriter {
private def constructStatusReportInfo(
statusInfo: StatusSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = TEXT_DELIMITER,
delimiter: String,
prettyPrint: Boolean,
reformatCSV: Boolean = true): Seq[String] = {
val reformatCSVFunc: String => String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ class RunningQualificationApp(
val sqlInfo = aggregatePerSQLStats(sqlID)
val csvResult =
constructPerSqlResult(sqlInfo, QualOutputWriter.CSV_DELIMITER, false, escapeCSV = true)
val textResult = constructPerSqlResult(sqlInfo, QualOutputWriter.TEXT_DELIMITER, true)
val textResult = constructPerSqlResult(sqlInfo, QualOutputWriter.TEXT_DELIMITER,
prettyPrint = true)
(csvResult, textResult)
}

Expand All @@ -176,8 +177,8 @@ class RunningQualificationApp(

private def constructPerSqlResult(
sqlInfo: Option[EstimatedPerSQLSummaryInfo],
delimiter: String = "|",
prettyPrint: Boolean = true,
delimiter: String,
prettyPrint: Boolean,
sqlDescLength: Int = SQL_DESC_LENGTH,
escapeCSV: Boolean = false): String = {
sqlInfo match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ abstract class AppBase(
val fs = eventLogPath.getFileSystem(hconf)
var totalNumEvents = 0
val readerOpt = eventLog match {
case dblog: DatabricksEventLog =>
case _: DatabricksEventLog =>
Some(new DatabricksRollingEventLogFilesFileReader(fs, eventLogPath))
case apachelog => EventLogFileReader(fs, eventLogPath)
case _ => EventLogFileReader(fs, eventLogPath)
}

if (readerOpt.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ object AppFilterImpl {
val timeInt = try {
timeStr.toInt
} catch {
case ne: NumberFormatException =>
case _: NumberFormatException =>
throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " +
"time must be greater than 0 and valid periods are min(minute),h(hours)" +
",d(days),w(weeks),m(months).")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class ApplicationInfo(
}

def aggregateSQLStageInfo: Seq[SQLStageInfoProfileResult] = {
val jobsWithSQL = jobIdToInfo.filter { case (id, j) =>
val jobsWithSQL = jobIdToInfo.filter { case (_, j) =>
j.sqlID.nonEmpty
}
val sqlToStages = jobsWithSQL.flatMap { case (jobId, j) =>
Expand Down Expand Up @@ -360,7 +360,7 @@ class ApplicationInfo(
val res = this.appInfo

val estimatedResult = this.appEndTime match {
case Some(t) => this.appEndTime
case Some(_) => this.appEndTime
case None =>
val jobEndTimes = jobIdToInfo.map { case (_, jc) => jc.endTime }.filter(_.isDefined)
val sqlEndTimes = sqlIdToInfo.map { case (_, sc) => sc.endTime }.filter(_.isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.apache.spark.sql.rapids.tool.qualification

import java.util.concurrent.TimeUnit

import scala.collection.mutable.{ArrayBuffer, HashMap}

import com.nvidia.spark.rapids.tool.EventLogInfo
Expand All @@ -31,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.rapids.tool.{AppBase, GpuEventLogException, IgnoreExecs, SupportedMLFuncsName, ToolUtils}
import org.apache.spark.sql.rapids.tool.{AppBase, GpuEventLogException, SupportedMLFuncsName, ToolUtils}

class QualificationAppInfo(
eventLogInfo: Option[EventLogInfo],
Expand Down Expand Up @@ -106,7 +104,7 @@ class QualificationAppInfo(
if (startTime > 0) {
val estimatedResult =
this.appEndTime match {
case Some(t) => this.appEndTime
case Some(_) => this.appEndTime
case None =>
if (lastSQLEndTime.isEmpty && lastJobEndTime.isEmpty) {
None
Expand Down Expand Up @@ -312,7 +310,7 @@ class QualificationAppInfo(
}
val transitionsTime = numTransitions match {
case 0 => 0L // no transitions
case gpuCpuTransitions =>
case _ =>
// Duration to transfer data from GPU to CPU and vice versa.
// Assuming it's a PCI-E Gen3, but also assuming that some of the result could be
// spilled to disk.
Expand All @@ -323,13 +321,11 @@ class QualificationAppInfo(
}
if (totalBytesRead > 0) {
val transitionTime = (totalBytesRead /
QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * gpuCpuTransitions
QualificationAppInfo.CPU_GPU_TRANSFER_RATE.toDouble) * numTransitions
(transitionTime * 1000).toLong // convert to milliseconds
} else {
0L
}

case _ => 0L
}
val finalEachStageUnsupported = if (transitionsTime != 0) {
// Add 50% penalty for unsupported duration if there are transitions. This number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class RunningQualificationEventProcessor(sparkConf: SparkConf) extends SparkList
listener.onOtherEvent(event)
event match {
case e: SparkListenerSQLExecutionStart =>
logDebug("Starting new SQL query")
logDebug(s"Starting new SQL query: ${e.executionId}")
case e: SparkListenerSQLExecutionEnd =>
writeSQLDetails(e.executionId)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ object EventUtils extends Logging {
*
* @param data value stored in the (value/update) of the AccumulableInfo
* @return valid parsed long of the content or the duration
* @throws java.lang.NullPointerException if the argument is `null`
*/
@throws[NullPointerException]
def parseAccumFieldToLong(data: Any): Option[Long] = {
val strData = data.toString
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object ToolTestUtils extends Logging {

def processProfileApps(logs: Array[String],
sparkSession: SparkSession): ArrayBuffer[ApplicationInfo] = {
var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs = new ProfileArgs(logs)
var index: Int = 1
for (path <- appArgs.eventlog()) {
Expand Down
Loading

0 comments on commit 03effc2

Please sign in to comment.