diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index 5aee8314a..b94f4f521 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -62,7 +62,7 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) { // factors since we don't have speedup factors for all combinations of // platforms and GPUs. We expect speedup factor usage to be going away // so this is less of an issue. - def defaultGpuForSpeedupFactor: GpuDevice = getGpuOrDefault + def defaultGpuForSpeedupFactor: GpuDevice = defaultGpuDevice /** * Recommendations to be excluded from the list of recommendations. @@ -113,12 +113,15 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) { s"operatorsScore-$platformName-$getGpuOrDefaultForSpeedupFactors.csv" } + def getDefaultOperatorScoreFile: String = { + s"operatorsScore-$platformName-$defaultGpuForSpeedupFactor.csv" + } + final def getGpuOrDefault: GpuDevice = gpuDevice.getOrElse(defaultGpuDevice) final def getGpuOrDefaultForSpeedupFactors: GpuDevice = gpuDevice.getOrElse(defaultGpuForSpeedupFactor) - final def setGpuDevice(gpuDevice: GpuDevice): Unit = { this.gpuDevice = Some(gpuDevice) } @@ -148,7 +151,6 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) { abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice]) extends Platform(gpuDevice) { override val defaultGpuDevice: GpuDevice = T4Gpu - override def isPlatformCSP: Boolean = true // note that Databricks generally sets the spark.executor.memory for the user. Our @@ -279,6 +281,11 @@ object PlatformFactory extends Logging { def createInstance(platformKey: String = PlatformNames.DEFAULT): Platform = { val (platformName, gpuName) = extractPlatformGpuName(platformKey) val gpuDevice = gpuName.flatMap(GpuDevice.createInstance) + // case when gpu name is detected but not in device map + if (gpuName.isDefined && gpuDevice.isEmpty) { + throw new IllegalArgumentException(s"Unsupported GPU device: ${gpuName.get}. " + + s"Supported GPU devices are: ${GpuDevice.deviceMap.keys.mkString(", ")}.") + } val platform = createPlatformInstance(platformName, gpuDevice) logInfo(s"Using platform: $platform") platform diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala index 29b9ece3d..7ffd01872 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.analysis import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet} import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser -import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} +import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer import org.apache.spark.sql.execution.SparkPlanInfo @@ -342,6 +342,58 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } } } + + /** + * Generate the stage level metrics for the SQL plan including GPU metrics if applicable. + * Along with Spark defined metrics, below is the list of GPU metrics that are collected if they + * are present in the eventlog: + * gpuSemaphoreWait, gpuRetryCount, gpuSplitAndRetryCount, gpuRetryBlockTime, + * gpuRetryComputationTime, gpuSpillToHostTime, gpuSpillToDiskTime, gpuReadSpillFromHostTime, + * gpuReadSpillFromDiskTime + * + * @return a sequence of AccumProfileResults + */ + def generateStageLevelAccums(): Seq[AccumProfileResults] = { + + def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = { + // drop the metrics if there are no values + if (updates.isEmpty) { + None + } else if (updates.length == 1) { + Some(StatisticsMetrics(0L, 0L, 0L, updates.sum)) + } else { + Some(StatisticsMetrics( + min = updates.head, + med = updates(updates.size / 2), + max = updates.last, + total = updates.sum + )) + } + } + + // Process taskStageAccumMap to get all the accumulators + val stageLevelAccums = app.taskStageAccumMap.values.flatten + val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId) + groupedByAccumulatorId.flatMap { case (accumulatorId, accums) => + // Extract and sort the update values, defaulting to 0 if not present + val sortedUpdates = accums.flatMap(_.update).toSeq.sorted + + // Compute the statistics for the accumulator if applicable + computeStatistics(sortedUpdates).map { stats => + val sampleAccum = accums.head + AccumProfileResults( + appIndex = appIndex, + stageId = sampleAccum.stageId.toString, + accumulatorId = accumulatorId, + name = sampleAccum.name.getOrElse("Unknown"), + min = stats.min, + median = stats.med, + max = stats.max, + total = stats.total + ) + } + }.toSeq + } } object AppSQLPlanAnalyzer { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala index a5692c261..6a5b197db 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala @@ -26,6 +26,7 @@ case class ApplicationSummaryInfo( rapidsProps: Seq[RapidsPropertyProfileResult], rapidsJar: Seq[RapidsJarProfileResult], sqlMetrics: Seq[SQLAccumProfileResults], + stageMetrics: Seq[AccumProfileResults], jobAggMetrics: Seq[JobAggTaskMetricsProfileResult], stageAggMetrics: Seq[StageAggTaskMetricsProfileResult], sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult], diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index 44d566236..ba2c45dbf 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -84,6 +84,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { ProfSQLPlanMetricsView.getRawView(apps) } + // Print all Stage level Metrics + def getStageLevelMetrics: Seq[AccumProfileResults] = { + ProfStageMetricView.getRawView(apps) + } + /** * This function is meant to clean up Delta log execs so that you could align * SQL ids between CPU and GPU eventlogs. It attempts to remove any delta log @@ -103,6 +108,12 @@ object CollectInformation extends Logging { } } + def generateStageLevelAccums(apps: Seq[ApplicationInfo]): Seq[AccumProfileResults] = { + apps.flatMap { app => + app.planMetricProcessor.generateStageLevelAccums() + } + } + def printSQLPlans(apps: Seq[ApplicationInfo], outputDir: String): Unit = { for (app <- apps) { val planFileWriter = new ToolTextFileWriter(s"$outputDir/${app.appId}", diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index e6d83c27e..d6e369ef8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -219,6 +219,23 @@ case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long, } } +case class AccumProfileResults(appIndex: Int, stageId: String, accumulatorId: Long, name: String, + min: Long, median: Long, max: Long, total: Long) extends ProfileResult { + override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min", + "median", "max", "total") + + override def convertToSeq: Seq[String] = { + Seq(appIndex.toString, stageId, accumulatorId.toString, name, min.toString, median.toString, + max.toString, total.toString) + } + + override def convertToCSVSeq: Seq[String] = { + Seq(appIndex.toString, StringUtils.reformatCSVString(stageId), accumulatorId.toString, + StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString, + total.toString) + } +} + case class ResourceProfileInfoCase( val resourceProfileId: Int, val executorResources: Map[String, ExecutorResourceRequest], diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index dd8aa5470..c2eaa6afe 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -329,6 +329,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea val systemProps = collect.getSystemProperties val rapidsJar = collect.getRapidsJARInfo val sqlMetrics = collect.getSQLPlanMetrics + val stageMetrics = collect.getStageLevelMetrics val wholeStage = collect.getWholeStageCodeGenMapping // for compare mode we just add in extra tables for matching across applications // the rest of the tables simply list all applications specified @@ -392,7 +393,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]") (ApplicationSummaryInfo(appInfo, dsInfo, collect.getExecutorInfo, collect.getJobInfo, rapidsProps, - rapidsJar, sqlMetrics, analysis.jobAggs, analysis.stageAggs, + rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs, analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew, failedTasks, failedStages, failedJobs, removedBMs, removedExecutors, unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo, @@ -471,6 +472,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea combineProps("rapids", appsSum).sortBy(_.key), appsSum.flatMap(_.rapidsJar).sortBy(_.appIndex), appsSum.flatMap(_.sqlMetrics).sortBy(_.appIndex), + appsSum.flatMap(_.stageMetrics).sortBy(_.appIndex), appsSum.flatMap(_.jobAggMetrics).sortBy(_.appIndex), appsSum.flatMap(_.stageAggMetrics).sortBy(_.appIndex), appsSum.flatMap(_.sqlTaskAggMetrics).sortBy(_.appIndex), @@ -513,6 +515,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea Some(ProfRapidsJarView.getDescription)) profileOutputWriter.write(ProfSQLPlanMetricsView.getLabel, app.sqlMetrics, Some(ProfSQLPlanMetricsView.getDescription)) + profileOutputWriter.write(ProfStageMetricView.getLabel, app.stageMetrics, + Some(ProfStageMetricView.getDescription)) profileOutputWriter.write(ProfSQLCodeGenView.getLabel, app.wholeStage, Some(ProfSQLCodeGenView.getDescription)) comparedRes.foreach { compareSum => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index e73426a68..eeba440a5 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -123,10 +123,19 @@ class PluginTypeChecker(val platform: Platform = PlatformFactory.createInstance( private def readOperatorsScore: Map[String, Double] = { speedupFactorFile match { case None => - logInfo(s"Reading operators scores with platform: $platform") + logInfo(s"Trying to read operators scores with platform: $platform") val file = platform.getOperatorScoreFile - val source = Source.fromResource(file) - readOperators(source, "score", true).map(x => (x._1, x._2.toDouble)) + try { + val source = Source.fromResource(file) + readOperators(source, "score", true).map(x => (x._1, x._2.toDouble)) + } catch { + case NonFatal(_) => + val defaultFile = platform.getDefaultOperatorScoreFile + logWarning(s"Unable to read operator scores from file: $file. " + + s"Using default operator scores file: $defaultFile.") + val source = Source.fromResource(defaultFile) + readOperators(source, "score", true).map(x => (x._1, x._2.toDouble)) + } case Some(file) => logInfo(s"Reading operators scores from custom speedup factor file: $file") try { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 06bdc4ad7..34c028bde 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -142,9 +142,9 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* descr = "Applications which a particular user has submitted." ) val htmlReport : ScallopOption[Boolean] = toggle("html-report", - default = Some(true), + default = Some(false), prefix = "no-", - descrYes = "Generates an HTML Report. Enabled by default.", + descrYes = "Generates an HTML Report. Disabled by default.", descrNo = "Disables generating the HTML report.") val perSql : ScallopOption[Boolean] = opt[Boolean](required = false, diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 1ddffb598..ab09bce45 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids.tool.qualification +import scala.util.control.NonFatal + import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory} import com.nvidia.spark.rapids.tool.tuning.TunerContext @@ -66,8 +68,8 @@ object QualificationMain extends Logging { val platform = try { PlatformFactory.createInstance(appArgs.platform()) } catch { - case ie: IllegalStateException => - logError("Error creating the platform", ie) + case NonFatal(e) => + logError("Error creating the platform", e) return (1, Seq[QualificationSummaryInfo]()) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/AppStageMetricsView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/AppStageMetricsView.scala new file mode 100644 index 000000000..e343ee7ea --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/AppStageMetricsView.scala @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.views + +import com.nvidia.spark.rapids.tool.analysis.{AppSQLPlanAnalyzer, ProfAppIndexMapperTrait} +import com.nvidia.spark.rapids.tool.profiling.AccumProfileResults + +import org.apache.spark.sql.rapids.tool.AppBase +import org.apache.spark.sql.rapids.tool.annotation.Since +import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo + +trait AppStageMetricsViewTrait extends ViewableTrait[AccumProfileResults] { + override def getLabel: String = "Stage Level All Metrics" + override def getDescription: String = "Stage Level Metrics" + + override def sortView( + rows: Seq[AccumProfileResults]): Seq[AccumProfileResults] = { + rows.sortBy(cols => (cols.stageId, cols.appIndex, cols.accumulatorId)) + } +} + +@Since("24.06.2") +object ProfStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait { + + override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = { + app match { + case app: ApplicationInfo => + app.planMetricProcessor.generateStageLevelAccums() + case _ => Seq.empty + } + } +} + +object QualStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait { + // Keep for the following refactor to customize the view based on the app type (Qual/Prof) + override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = { + Seq.empty + } + + def getRawViewFromSqlProcessor( + sqlAnalyzer: AppSQLPlanAnalyzer): Seq[AccumProfileResults] = { + sortView(sqlAnalyzer.generateStageLevelAccums()) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala index 61c78c470..27455d9e9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala @@ -75,6 +75,9 @@ object QualRawReportGenerator { pWriter.write(QualExecutorView.getLabel, QualExecutorView.getRawView(Seq(app))) pWriter.write(QualAppJobView.getLabel, QualAppJobView.getRawView(Seq(app))) generateSQLProcessingView(pWriter, sqlPlanAnalyzer) + pWriter.write(QualStageMetricView.getLabel, + QualStageMetricView.getRawViewFromSqlProcessor(sqlPlanAnalyzer), + Some(QualStageMetricView.getDescription)) pWriter.write(RapidsQualPropertiesView.getLabel, RapidsQualPropertiesView.getRawView(Seq(app)), Some(RapidsQualPropertiesView.getDescription)) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 92d0f520b..7fd345e64 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids.tool.profiling import java.io.File +import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths, StandardOpenOption} import scala.collection.mutable.ArrayBuffer @@ -202,6 +203,50 @@ class ApplicationInfoSuite extends FunSuite with Logging { ToolTestUtils.compareDataFrames(df, dfExpect) } + test("test GpuMetrics in eventlog") { + TrampolineUtil.withTempDir { outputDir => + TrampolineUtil.withTempDir { tmpEventLogDir => + val eventLogFilePath = Paths.get(tmpEventLogDir.getAbsolutePath, "gpu_metrics_eventlog") + // scalastyle:off line.size.limit + val eventLogContent = + """{"Event":"SparkListenerLogStart","Spark Version":"3.2.1"} + |{"Event":"SparkListenerApplicationStart","App Name":"GPUMetrics", "App ID":"local-16261043003", "Timestamp":123456, "User":"User1"} + |{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5073,"Index":5054,"Attempt":0,"Partition ID":5054,"Launch Time":1712248533994,"Executor ID":"100","Host":"10.154.65.143","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253284920,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.492","Value":"03:13:31.359","Internal":false,"Count Failed Values":true},{"ID":1018,"Name":"gpuSpillToHostTime","Update":"00:00:00.845","Value":"00:29:39.521","Internal":false,"Count Failed Values":true},{"ID":1016,"Name":"gpuSplitAndRetryCount","Update":"1","Value":"2","Internal":false,"Count Failed Values":true}]}}""".stripMargin + // scalastyle:on line.size.limit + Files.write(eventLogFilePath, eventLogContent.getBytes(StandardCharsets.UTF_8)) + + val profileArgs = Array( + "--output-directory", outputDir.getAbsolutePath, + eventLogFilePath.toString + ) + + val appArgs = new ProfileArgs(profileArgs) + val (exit, _) = ProfileMain.mainInternal(appArgs) + assert(exit == 0) + + val apps = ArrayBuffer[ApplicationInfo]() + var index = 1 + + val eventLogPaths = appArgs.eventlog() + eventLogPaths.foreach { path => + val eventLogInfo = EventLogPathProcessor.getEventLogInfo(path, hadoopConf).head._1 + apps += new ApplicationInfo(hadoopConf, eventLogInfo, index) + index += 1 + } + assert(apps.size == 1) + + val collect = new CollectInformation(apps) + val gpuMetrics = collect.getStageLevelMetrics + + // Sample eventlog has 3 gpu metrics, gpuSemaphoreWait, + // gpuSpillToHostTime, gpuSplitAndRetryCount + assert(gpuMetrics.size == 3) + val gpuSemaphoreWait = gpuMetrics.find(_.name == "gpuSemaphoreWait") + assert(gpuSemaphoreWait.isDefined) + } + } + } + test("test printSQLPlans") { TrampolineUtil.withTempDir { tempOutputDir => val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() @@ -790,7 +835,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 19) + assert(dotDirs.length === 20) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -824,7 +869,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 15) + assert(dotDirs.length === 16) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -861,7 +906,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 19) + assert(dotDirs.length === 20) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -898,7 +943,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 17) + assert(dotDirs.length === 18) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly diff --git a/data_validation/pyproject.toml b/data_validation/pyproject.toml index 3822878c0..0516a2b98 100644 --- a/data_validation/pyproject.toml +++ b/data_validation/pyproject.toml @@ -24,9 +24,9 @@ dependencies = [ "importlib-resources==5.10.2", "requests==2.32.2", "packaging==23.0", - "certifi==2023.7.22", + "certifi==2024.7.4", "idna==3.7", - "urllib3==1.26.18", + "urllib3==1.26.19", "beautifulsoup4==4.11.2" ] dynamic=["entry-points", "version"] diff --git a/user_tools/README.md b/user_tools/README.md index ab0e2c456..c61d07c7f 100644 --- a/user_tools/README.md +++ b/user_tools/README.md @@ -17,7 +17,7 @@ The wrapper improves end-user experience within the following dimensions: ## Getting started -Set up a Python environment with a version between 3.8 and 3.10 +Set up a Python environment with a version between 3.8 and 3.11 1. Run the project in a virtual environment. Note, .venv is the directory created to put the virtual env in, so modify if you want a different location. @@ -80,7 +80,7 @@ Set up a Python environment similar to the steps above. Please refer to [spark-rapids-user-tools guide](https://github.com/NVIDIA/spark-rapids-tools/blob/main/user_tools/docs/index.md) for details on how to use the tools and the platform. -Please refer to [qualx guide](docs/qualx.md) for details on how to use the QualX tool for prediction and training. +Please refer to [qualx guide](https://github.com/NVIDIA/spark-rapids-tools/blob/main/user_tools/docs/qualx.md) for details on how to use the QualX tool for prediction and training. ## What's new diff --git a/user_tools/docs/qualx.md b/user_tools/docs/qualx.md index 0183c7ce7..a540a08cb 100644 --- a/user_tools/docs/qualx.md +++ b/user_tools/docs/qualx.md @@ -128,3 +128,20 @@ Once satisfied with the model, we can just overwrite an existing pre-trained mod cp onprem.json user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost ``` +### Fine-tuning / Incremental Training + +To continue training an existing pre-trained model on new data, just set up the new dataset per above and then +reference the existing base model when training: +```bash +spark_rapids train \ +--dataset datasets \ +--model onprem.json \ +--base_model user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/onprem.json \ +--output_folder train_output +``` + +Evaluate the new model's performance, and when ready, overwrite the existing pre-trained model to use it as a +drop-in replacement: +```bash +cp onprem.json user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost +``` \ No newline at end of file diff --git a/user_tools/docs/user-tools-aws-emr.md b/user_tools/docs/user-tools-aws-emr.md index f2283c845..c362cd336 100644 --- a/user_tools/docs/user-tools-aws-emr.md +++ b/user_tools/docs/user-tools-aws-emr.md @@ -26,7 +26,7 @@ the applications running on AWS EMR. ### 3.Install the package -- Install `spark-rapids-user-tools` with python [3.8, 3.10] using: +- Install `spark-rapids-user-tools` with python [3.8, 3.11] using: - pip: `pip install spark-rapids-user-tools` - wheel-file: `pip install ` - from source: `pip install -e .` diff --git a/user_tools/docs/user-tools-databricks-aws.md b/user_tools/docs/user-tools-databricks-aws.md index c0c3f9335..255b6be7d 100644 --- a/user_tools/docs/user-tools-databricks-aws.md +++ b/user_tools/docs/user-tools-databricks-aws.md @@ -28,7 +28,7 @@ The tool currently only supports event logs stored on S3 (no DBFS paths). The re ### 4.Install the package -- Install `spark-rapids-user-tools` with python [3.8, 3.10] using: +- Install `spark-rapids-user-tools` with python [3.8, 3.11] using: - pip: `pip install spark-rapids-user-tools` - wheel-file: `pip install ` - from source: `pip install -e .` diff --git a/user_tools/docs/user-tools-databricks-azure.md b/user_tools/docs/user-tools-databricks-azure.md index d761ad06a..6939a048d 100644 --- a/user_tools/docs/user-tools-databricks-azure.md +++ b/user_tools/docs/user-tools-databricks-azure.md @@ -33,7 +33,7 @@ The tool currently only supports event logs stored on ABFS ([Azure Blob File Sys ### 4.Install the package -- Install `spark-rapids-user-tools` with python [3.8, 3.10] using: +- Install `spark-rapids-user-tools` with python [3.8, 3.11] using: - pip: `pip install spark-rapids-user-tools` - wheel-file: `pip install ` - from source: `pip install -e .` diff --git a/user_tools/docs/user-tools-dataproc-gke.md b/user_tools/docs/user-tools-dataproc-gke.md index fc64664f7..0ff2b35ee 100644 --- a/user_tools/docs/user-tools-dataproc-gke.md +++ b/user_tools/docs/user-tools-dataproc-gke.md @@ -29,7 +29,7 @@ the applications running on _Google Cloud Dataproc GKE_. ### 3.Install the package -- Install `spark-rapids-user-tools` with python [3.8, 3.10] using: +- Install `spark-rapids-user-tools` with python [3.8, 3.11] using: - pip: `pip install spark-rapids-user-tools` - wheel-file: `pip install ` - from source: `pip install -e .` diff --git a/user_tools/docs/user-tools-dataproc.md b/user_tools/docs/user-tools-dataproc.md index 00ce08a17..279dea96a 100644 --- a/user_tools/docs/user-tools-dataproc.md +++ b/user_tools/docs/user-tools-dataproc.md @@ -29,7 +29,7 @@ the applications running on _Google Cloud Dataproc_. ### 3.Install the package -- Install `spark-rapids-user-tools` with python [3.8, 3.10] using: +- Install `spark-rapids-user-tools` with python [3.8, 3.11] using: - pip: `pip install spark-rapids-user-tools` - wheel-file: `pip install ` - from source: `pip install -e .` diff --git a/user_tools/docs/user-tools-onprem.md b/user_tools/docs/user-tools-onprem.md index 45bc122d4..82f0fb7d4 100644 --- a/user_tools/docs/user-tools-onprem.md +++ b/user_tools/docs/user-tools-onprem.md @@ -17,7 +17,7 @@ The tool currently only supports event logs stored on local path. The remote out ### 2.Install the package -- Install `spark-rapids-user-tools` with python [3.8, 3.10] using: +- Install `spark-rapids-user-tools` with python [3.8, 3.11] using: - pip: `pip install spark-rapids-user-tools` - wheel-file: `pip install ` - from source: `pip install -e .` diff --git a/user_tools/pyproject.toml b/user_tools/pyproject.toml index 7e972d8b9..c6cf81795 100644 --- a/user_tools/pyproject.toml +++ b/user_tools/pyproject.toml @@ -32,9 +32,9 @@ dependencies = [ "importlib-resources==5.10.2", "requests==2.31.0", "packaging>=23.0", - "certifi==2023.7.22", + "certifi==2024.7.4", "idna==3.4", - "urllib3==1.26.14", + "urllib3==1.26.19", "beautifulsoup4==4.11.2", "pygments==2.15.0", # used to apply validator on objects and models diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 086cd15b4..d54a32f2a 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -907,8 +907,8 @@ def process_df_for_stdout(raw_df): try: df = self.__update_apps_with_prediction_info(df) except Exception as e: # pylint: disable=broad-except - self.logger.warning('Unable to use XGBoost estimation model for speed ups. ' - 'Falling-back to default model. Reason - %s:%s', type(e).__name__, e) + self.logger.error('Unable to use XGBoost estimation model for speed ups. ' + 'Falling-back to default model. Reason - %s:%s', type(e).__name__, e) estimation_model_col = self.ctxt.get_value('local', 'output', 'predictionModel', 'updateResult', 'estimationModelColumn') if estimation_model_col not in df: diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualx/train.py b/user_tools/src/spark_rapids_pytools/rapids/qualx/train.py index 1cc184cde..088c375f6 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualx/train.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualx/train.py @@ -37,6 +37,7 @@ class Train(QualXTool): dataset: str = None model: str = None n_trials: int = None + base_model: str = None name = 'train' @@ -45,7 +46,13 @@ def _run_rapids_tool(self): Runs the QualX train tool, saves the trained model and training results. """ try: - train(dataset=self.dataset, model=self.model, output_dir=self.output_folder, n_trials=self.n_trials) + train( + dataset=self.dataset, + model=self.model, + output_dir=self.output_folder, + n_trials=self.n_trials, + base_model=self.base_model, + ) self.logger.info('Training completed successfully.') self.logger.info('Trained XGBoost model is saved at: %s', self.model) self.logger.info('Training results are generated at: %s', self.output_folder) diff --git a/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/combined.json.cfg b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/combined.json.cfg new file mode 100644 index 000000000..dd8e91dbc --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/combined.json.cfg @@ -0,0 +1 @@ +{"learner":{"generic_param":{"device":"cpu","fail_on_invalid_gpu_id":"0","n_jobs":"0","nthread":"0","random_state":"0","seed":"0","seed_per_iteration":"0","validate_parameters":"1"},"gradient_booster":{"gbtree_model_param":{"num_parallel_tree":"1","num_trees":"97"},"gbtree_train_param":{"process_type":"default","tree_method":"auto","updater":"grow_quantile_histmaker","updater_seq":"grow_quantile_histmaker"},"name":"gbtree","specified_updater":false,"tree_train_param":{"alpha":"0","cache_opt":"1","colsample_bylevel":"1","colsample_bynode":"1","colsample_bytree":"1","eta":"0.0450047925","gamma":"0.00790312421","grow_policy":"depthwise","interaction_constraints":"","lambda":"1","learning_rate":"0.0450047925","max_bin":"256","max_cat_threshold":"64","max_cat_to_onehot":"4","max_delta_step":"0","max_depth":"7","max_leaves":"0","min_child_weight":"5","min_split_loss":"0.00790312421","monotone_constraints":"()","refresh_leaf":"1","reg_alpha":"0","reg_lambda":"1","sampling_method":"uniform","sketch_ratio":"2","sparse_threshold":"0.20000000000000001","subsample":"0.710851729"},"updater":[{"hist_train_param":{"debug_synchronize":"0","max_cached_hist_node":"65536"},"name":"grow_quantile_histmaker"}]},"learner_model_param":{"base_score":"7.1804976E-1","boost_from_average":"1","num_class":"0","num_feature":"108","num_target":"1"},"learner_train_param":{"booster":"gbtree","disable_default_eval_metric":"0","multi_strategy":"one_output_per_tree","objective":"reg:squarederror"},"metrics":[{"name":"mae"},{"name":"mape"}],"objective":{"name":"reg:squarederror","reg_loss_param":{"scale_pos_weight":"1"}}},"version":[2,0,3]} \ No newline at end of file diff --git a/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/databricks-aws.json.cfg b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/databricks-aws.json.cfg new file mode 100644 index 000000000..d97ecc6f2 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/databricks-aws.json.cfg @@ -0,0 +1 @@ +{"learner":{"generic_param":{"device":"cpu","fail_on_invalid_gpu_id":"0","n_jobs":"0","nthread":"0","random_state":"0","seed":"0","seed_per_iteration":"0","validate_parameters":"1"},"gradient_booster":{"gbtree_model_param":{"num_parallel_tree":"1","num_trees":"93"},"gbtree_train_param":{"process_type":"default","tree_method":"auto","updater":"grow_quantile_histmaker","updater_seq":"grow_quantile_histmaker"},"name":"gbtree","specified_updater":false,"tree_train_param":{"alpha":"0","cache_opt":"1","colsample_bylevel":"1","colsample_bynode":"1","colsample_bytree":"1","eta":"0.0451330915","gamma":"0.043721877","grow_policy":"depthwise","interaction_constraints":"","lambda":"1","learning_rate":"0.0451330915","max_bin":"256","max_cat_threshold":"64","max_cat_to_onehot":"4","max_delta_step":"0","max_depth":"6","max_leaves":"0","min_child_weight":"3","min_split_loss":"0.043721877","monotone_constraints":"()","refresh_leaf":"1","reg_alpha":"0","reg_lambda":"1","sampling_method":"uniform","sketch_ratio":"2","sparse_threshold":"0.20000000000000001","subsample":"0.705104232"},"updater":[{"hist_train_param":{"debug_synchronize":"0","max_cached_hist_node":"65536"},"name":"grow_quantile_histmaker"}]},"learner_model_param":{"base_score":"1.9250162E-1","boost_from_average":"1","num_class":"0","num_feature":"108","num_target":"1"},"learner_train_param":{"booster":"gbtree","disable_default_eval_metric":"0","multi_strategy":"one_output_per_tree","objective":"reg:squarederror"},"metrics":[{"name":"mae"},{"name":"mape"}],"objective":{"name":"reg:squarederror","reg_loss_param":{"scale_pos_weight":"1"}}},"version":[2,0,3]} \ No newline at end of file diff --git a/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/databricks-azure.json.cfg b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/databricks-azure.json.cfg new file mode 100644 index 000000000..7aaf42527 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/databricks-azure.json.cfg @@ -0,0 +1 @@ +{"learner":{"generic_param":{"device":"cpu","fail_on_invalid_gpu_id":"0","n_jobs":"0","nthread":"0","random_state":"0","seed":"0","seed_per_iteration":"0","validate_parameters":"1"},"gradient_booster":{"gbtree_model_param":{"num_parallel_tree":"1","num_trees":"72"},"gbtree_train_param":{"process_type":"default","tree_method":"auto","updater":"grow_quantile_histmaker","updater_seq":"grow_quantile_histmaker"},"name":"gbtree","specified_updater":false,"tree_train_param":{"alpha":"0","cache_opt":"1","colsample_bylevel":"1","colsample_bynode":"1","colsample_bytree":"1","eta":"0.0462181643","gamma":"0.0588796698","grow_policy":"depthwise","interaction_constraints":"","lambda":"1","learning_rate":"0.0462181643","max_bin":"256","max_cat_threshold":"64","max_cat_to_onehot":"4","max_delta_step":"0","max_depth":"6","max_leaves":"0","min_child_weight":"4","min_split_loss":"0.0588796698","monotone_constraints":"()","refresh_leaf":"1","reg_alpha":"0","reg_lambda":"1","sampling_method":"uniform","sketch_ratio":"2","sparse_threshold":"0.20000000000000001","subsample":"0.697218001"},"updater":[{"hist_train_param":{"debug_synchronize":"0","max_cached_hist_node":"65536"},"name":"grow_quantile_histmaker"}]},"learner_model_param":{"base_score":"6.1622936E-1","boost_from_average":"1","num_class":"0","num_feature":"108","num_target":"1"},"learner_train_param":{"booster":"gbtree","disable_default_eval_metric":"0","multi_strategy":"one_output_per_tree","objective":"reg:squarederror"},"metrics":[{"name":"mae"},{"name":"mape"}],"objective":{"name":"reg:squarederror","reg_loss_param":{"scale_pos_weight":"1"}}},"version":[2,0,3]} \ No newline at end of file diff --git a/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/dataproc.json.cfg b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/dataproc.json.cfg new file mode 100644 index 000000000..84c0cafc2 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/dataproc.json.cfg @@ -0,0 +1 @@ +{"learner":{"generic_param":{"device":"cpu","fail_on_invalid_gpu_id":"0","n_jobs":"0","nthread":"0","random_state":"0","seed":"0","seed_per_iteration":"0","validate_parameters":"1"},"gradient_booster":{"gbtree_model_param":{"num_parallel_tree":"1","num_trees":"98"},"gbtree_train_param":{"process_type":"default","tree_method":"auto","updater":"grow_quantile_histmaker","updater_seq":"grow_quantile_histmaker"},"name":"gbtree","specified_updater":false,"tree_train_param":{"alpha":"0","cache_opt":"1","colsample_bylevel":"1","colsample_bynode":"1","colsample_bytree":"1","eta":"0.0439748727","gamma":"0.00382978795","grow_policy":"depthwise","interaction_constraints":"","lambda":"1","learning_rate":"0.0439748727","max_bin":"256","max_cat_threshold":"64","max_cat_to_onehot":"4","max_delta_step":"0","max_depth":"6","max_leaves":"0","min_child_weight":"3","min_split_loss":"0.00382978795","monotone_constraints":"()","refresh_leaf":"1","reg_alpha":"0","reg_lambda":"1","sampling_method":"uniform","sketch_ratio":"2","sparse_threshold":"0.20000000000000001","subsample":"0.731714725"},"updater":[{"hist_train_param":{"debug_synchronize":"0","max_cached_hist_node":"65536"},"name":"grow_quantile_histmaker"}]},"learner_model_param":{"base_score":"9.0527546E-1","boost_from_average":"1","num_class":"0","num_feature":"108","num_target":"1"},"learner_train_param":{"booster":"gbtree","disable_default_eval_metric":"0","multi_strategy":"one_output_per_tree","objective":"reg:squarederror"},"metrics":[{"name":"mae"},{"name":"mape"}],"objective":{"name":"reg:squarederror","reg_loss_param":{"scale_pos_weight":"1"}}},"version":[2,0,3]} \ No newline at end of file diff --git a/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/emr.json.cfg b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/emr.json.cfg new file mode 100644 index 000000000..e3e01aa24 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/emr.json.cfg @@ -0,0 +1 @@ +{"learner":{"generic_param":{"device":"cpu","fail_on_invalid_gpu_id":"0","n_jobs":"0","nthread":"0","random_state":"0","seed":"0","seed_per_iteration":"0","validate_parameters":"1"},"gradient_booster":{"gbtree_model_param":{"num_parallel_tree":"1","num_trees":"93"},"gbtree_train_param":{"process_type":"default","tree_method":"auto","updater":"grow_quantile_histmaker","updater_seq":"grow_quantile_histmaker"},"name":"gbtree","specified_updater":false,"tree_train_param":{"alpha":"0","cache_opt":"1","colsample_bylevel":"1","colsample_bynode":"1","colsample_bytree":"1","eta":"0.044250112","gamma":"0.0288002621","grow_policy":"depthwise","interaction_constraints":"","lambda":"1","learning_rate":"0.044250112","max_bin":"256","max_cat_threshold":"64","max_cat_to_onehot":"4","max_delta_step":"0","max_depth":"6","max_leaves":"0","min_child_weight":"5","min_split_loss":"0.0288002621","monotone_constraints":"()","refresh_leaf":"1","reg_alpha":"0","reg_lambda":"1","sampling_method":"uniform","sketch_ratio":"2","sparse_threshold":"0.20000000000000001","subsample":"0.691695809"},"updater":[{"hist_train_param":{"debug_synchronize":"0","max_cached_hist_node":"65536"},"name":"grow_quantile_histmaker"}]},"learner_model_param":{"base_score":"9.625077E-2","boost_from_average":"1","num_class":"0","num_feature":"108","num_target":"1"},"learner_train_param":{"booster":"gbtree","disable_default_eval_metric":"0","multi_strategy":"one_output_per_tree","objective":"reg:squarederror"},"metrics":[{"name":"mae"},{"name":"mape"}],"objective":{"name":"reg:squarederror","reg_loss_param":{"scale_pos_weight":"1"}}},"version":[2,0,3]} \ No newline at end of file diff --git a/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/onprem.json.cfg b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/onprem.json.cfg new file mode 100644 index 000000000..7f3862746 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/resources/qualx/models/xgboost/onprem.json.cfg @@ -0,0 +1 @@ +{"learner":{"generic_param":{"device":"cpu","fail_on_invalid_gpu_id":"0","n_jobs":"0","nthread":"0","random_state":"0","seed":"0","seed_per_iteration":"0","validate_parameters":"1"},"gradient_booster":{"gbtree_model_param":{"num_parallel_tree":"1","num_trees":"97"},"gbtree_train_param":{"process_type":"default","tree_method":"auto","updater":"grow_quantile_histmaker","updater_seq":"grow_quantile_histmaker"},"name":"gbtree","specified_updater":false,"tree_train_param":{"alpha":"0","cache_opt":"1","colsample_bylevel":"1","colsample_bynode":"1","colsample_bytree":"1","eta":"0.047951851","gamma":"0.0653549582","grow_policy":"depthwise","interaction_constraints":"","lambda":"1","learning_rate":"0.047951851","max_bin":"256","max_cat_threshold":"64","max_cat_to_onehot":"4","max_delta_step":"0","max_depth":"8","max_leaves":"0","min_child_weight":"3","min_split_loss":"0.0653549582","monotone_constraints":"()","refresh_leaf":"1","reg_alpha":"0","reg_lambda":"1","sampling_method":"uniform","sketch_ratio":"2","sparse_threshold":"0.20000000000000001","subsample":"0.703936696"},"updater":[{"hist_train_param":{"debug_synchronize":"0","max_cached_hist_node":"65536"},"name":"grow_quantile_histmaker"}]},"learner_model_param":{"base_score":"9.229671E-1","boost_from_average":"1","num_class":"0","num_feature":"108","num_target":"1"},"learner_train_param":{"booster":"gbtree","disable_default_eval_metric":"0","multi_strategy":"one_output_per_tree","objective":"reg:squarederror"},"metrics":[{"name":"mae"},{"name":"mape"}],"objective":{"name":"reg:squarederror","reg_loss_param":{"scale_pos_weight":"1"}}},"version":[2,0,3]} \ No newline at end of file diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 1906c7b56..2bf73ca30 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -652,6 +652,7 @@ class TrainUserArgModel(AbsToolUserArgModel): dataset: str = None model: Optional[str] = None n_trials: Optional[int] = None + base_model: Optional[str] = None def build_tools_args(self) -> dict: runtime_platform = CspEnv.fromstring(self.platform) @@ -661,5 +662,6 @@ def build_tools_args(self) -> dict: 'model': self.model, 'output_folder': self.output_folder, 'n_trials': self.n_trials, + 'base_model': self.base_model, 'platformOpts': {}, } diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index a0cfe0fdc..abe39faef 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -69,7 +69,8 @@ def qualification(self, Skipping this argument requires that the cluster argument points to a valid cluster name on the CSP. - :param cluster: Name or ID (for databricks platforms) of cluster or path to cluster-properties. + :param cluster: The CPU cluster on which the Spark application(s) were executed. + Name or ID (for databricks platforms) of cluster or path to cluster-properties. :param platform: defines one of the following "onprem", "emr", "dataproc", "dataproc-gke", "databricks-aws", and "databricks-azure". :param target_platform: Cost savings and speedup recommendation for comparable cluster in @@ -257,7 +258,8 @@ def train(self, dataset: str = None, model: str = None, output_folder: str = None, - n_trials: int = 200): + n_trials: int = 200, + base_model: str = None): """The train cmd trains an XGBoost model on the input data to estimate the speedup of a Spark CPU application. @@ -265,6 +267,7 @@ def train(self, :param model: Path to save the trained XGBoost model. :param output_folder: Path to store the output. :param n_trials: Number of trials for hyperparameter search. + :param base_model: Path to pre-trained XGBoost model to continue training from. """ # Since train is an internal tool with frequent output, we enable debug mode by default ToolLogging.enable_debug_mode() @@ -275,13 +278,15 @@ def train(self, dataset=dataset, model=model, output_folder=output_folder, - n_trials=n_trials) + n_trials=n_trials, + base_model=base_model) tool_obj = Train(platform_type=train_args['runtimePlatform'], dataset=dataset, model=model, output_folder=output_folder, n_trials=n_trials, + base_model=base_model, wrapper_options=train_args) tool_obj.launch() diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/model.py b/user_tools/src/spark_rapids_tools/tools/qualx/model.py index 028379d90..d02128fb0 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/model.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/model.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from typing import Callable, Optional, List, Tuple import numpy as np import pandas as pd @@ -21,7 +22,7 @@ from spark_rapids_tools.tools.qualx.preprocess import expected_raw_features from spark_rapids_tools.tools.qualx.util import get_logger, INTERMEDIATE_DATA_ENABLED from tabulate import tabulate -from xgboost import XGBModel +from xgboost import Booster, XGBModel # Import optional packages try: import optuna @@ -60,6 +61,7 @@ def train( feature_cols: List[str], label_col: str, n_trials: int = 200, + base_model: Optional[Booster] = None, ) -> XGBModel: """Train model on preprocessed data.""" if 'split' not in cpu_aug_tbl.columns: @@ -101,28 +103,43 @@ def train( y_tune = cpu_aug_tbl.loc[cpu_aug_tbl['split'] != 'test', label_col] dtune = xgb.DMatrix(X_tune, y_tune) - best_params = tune_hyperparameters(X_tune, y_tune, n_trials) - logger.info(best_params) - - # train model w/ best hyperparameters using data splits - base_params = { - 'random_state': 0, - 'objective': 'reg:squarederror', - 'eval_metric': ['mae', 'mape'], # applied to eval_set/test_data if provided - 'booster': 'gbtree', - } - xgb_params = {**base_params, **best_params} - xgb_params.pop('n_estimators') + if base_model: + # use hyper-parameters from base model (w/ modifications to learning rate and num trees) + xgb_params = {} + cfg = json.loads(base_model.save_config()) + train_params = cfg['learner']['gradient_booster']['tree_train_param'] + xgb_params['eta'] = float(train_params['eta']) / 10.0 # decrease learning rate + xgb_params['gamma'] = float(train_params['gamma']) + xgb_params['max_depth'] = int(train_params['max_depth']) + xgb_params['min_child_weight'] = int(train_params['min_child_weight']) + xgb_params['subsample'] = float(train_params['subsample']) + n_estimators = cfg['learner']['gradient_booster']['gbtree_model_param']['num_trees'] + n_estimators = int(float(n_estimators) * 1.1) # increase n_estimators + else: + # use optuna hyper-parameter tuning + best_params = tune_hyperparameters(X_tune, y_tune, n_trials) + logger.info(best_params) + + # train model w/ best hyperparameters using data splits + base_params = { + 'random_state': 0, + 'objective': 'reg:squarederror', + 'eval_metric': ['mae', 'mape'], # applied to eval_set/test_data if provided + 'booster': 'gbtree', + } + xgb_params = {**base_params, **best_params} + n_estimators = xgb_params.pop('n_estimators') # train model evals_result = {} xgb_model = xgb.train( xgb_params, dtrain=dtune, - num_boost_round=best_params['n_estimators'], + num_boost_round=n_estimators, evals=[(dtrain, 'train'), (dval, 'val')], verbose_eval=50, evals_result=evals_result, + xgb_model=base_model, ) return xgb_model @@ -207,7 +224,7 @@ def predict( def extract_model_features( - df: pd.DataFrame, split_fn: Callable[[pd.DataFrame], pd.DataFrame] = None + df: pd.DataFrame, split_fn: List[Callable[[pd.DataFrame], pd.DataFrame]] = [] ) -> Tuple[pd.DataFrame, List[str], str]: """Extract model features from raw features.""" missing = expected_raw_features - set(df.columns) @@ -303,8 +320,8 @@ def extract_model_features( feature_cols = [c for c in feature_cols if c not in extra] # add train/val/test split column, if split function provided - if split_fn: - cpu_aug_tbl = split_fn(cpu_aug_tbl) + for fn in split_fn: + cpu_aug_tbl = fn(cpu_aug_tbl) return cpu_aug_tbl, feature_cols, label_col diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py index b402dcd08..fdcbfa878 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py @@ -374,8 +374,7 @@ def infer_app_meta(eventlogs: List[str]) -> Mapping[str, Mapping]: # run any plugin hooks on profile_df for p in plugins: plugin = load_plugin(p) - if plugin: - profile_df = plugin.load_profiles_hook(profile_df) + profile_df = plugin.load_profiles_hook(profile_df) return profile_df diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py index d038c5b34..b36b4ec24 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py @@ -43,6 +43,7 @@ get_cache_dir, get_logger, get_dataset_platforms, + load_plugin, print_summary, print_speedup_summary, run_qualification_tool, @@ -194,7 +195,7 @@ def _predict( else 'raw' ) logger.info(f'Predicting dataset ({filter_str}): {dataset}') - features, feature_cols, label_col = extract_model_features(input_df, split_fn) + features, feature_cols, label_col = extract_model_features(input_df, [split_fn]) # note: dataset name is already stored in the 'appName' field try: results = predict_model(xgb_model, features, feature_cols, label_col) @@ -374,6 +375,7 @@ def train( model: Optional[str] = 'xgb_model.json', output_dir: Optional[str] = 'train', n_trials: Optional[int] = 200, + base_model: Optional[str] = None, ): """Train an XGBoost model. @@ -387,6 +389,8 @@ def train( Path to save the trained XGBoost model. n_trials: Number of trials for hyperparameter search. + base_model: + Path to an existing pre-trained model to continue training from. """ datasets, profile_df = load_datasets(dataset) dataset_list = sorted(list(datasets.keys())) @@ -399,13 +403,41 @@ def train( f'Training data contained datasets: {profile_datasets}, expected: {dataset_list}.' ) - features, feature_cols, label_col = extract_model_features(profile_df, split_nds) - xgb_model = train_model(features, feature_cols, label_col, n_trials=n_trials) + split_functions = [split_nds] + for ds_name, ds_meta in datasets.items(): + if 'split_function' in ds_meta: + plugin_path = ds_meta['split_function'] + logger.info(f'Using split function for {ds_name} dataset from plugin: {plugin_path}') + plugin = load_plugin(plugin_path) + split_functions.append(plugin.split_function) + + features, feature_cols, label_col = extract_model_features(profile_df, split_functions) + + xgb_base_model = None + if base_model: + if os.path.exists(base_model): + logger.info(f'Fine-tuning on base model from: {base_model}') + xgb_base_model = xgb.Booster() + xgb_base_model.load_model(base_model) + base_model_cfg = base_model + '.cfg' + if os.path.exists(base_model_cfg): + with open(base_model + '.cfg', 'r') as f: + cfg = f.read() + xgb_base_model.load_config(cfg) + else: + raise ValueError(f'Existing model config not found: {base_model_cfg}') + feature_cols = xgb_base_model.feature_names # align features to base model + else: + raise ValueError(f'Existing model not found for fine-tuning: {base_model}') + xgb_model = train_model(features, feature_cols, label_col, n_trials=n_trials, base_model=xgb_base_model) - # save model + # save model and params ensure_directory(model, parent=True) logger.info(f'Saving model to: {model}') xgb_model.save_model(model) + cfg = xgb_model.save_config() + with open(model + '.cfg', 'w') as f: + f.write(cfg) ensure_directory(output_dir) compute_feature_importance(xgb_model, features, feature_cols, output_dir) @@ -442,22 +474,19 @@ def predict( 'app_meta': {}, 'platform': platform, } - # search sub directories for appIds - appIds = find_paths( - metrics_dir, lambda x: RegexPattern.appId.match(x), return_directories=True - ) - appIds = [Path(p).name for p in appIds] - if len(appIds) == 0: + # search sub directories for App IDs + app_ids = [p.name for p in Path(metrics_dir).iterdir() if p.is_dir()] + if len(app_ids) == 0: logger.warning(f'Skipping empty metrics directory: {metrics_dir}') else: try: - for appId in appIds: + for app_id in app_ids: # create dummy app_meta, assuming CPU and scale factor of 1 (for inference) datasets[dataset_name]['app_meta'].update( - {appId: {'runType': 'CPU', 'scaleFactor': 1}} + {app_id: {'runType': 'CPU', 'scaleFactor': 1}} ) - # update the dataset_name for each appId - default_preds_df.loc[default_preds_df['appId'] == appId, 'dataset_name'] = dataset_name + # update the dataset_name for each App ID + default_preds_df.loc[default_preds_df['appId'] == app_id, 'dataset_name'] = dataset_name logger.info(f'Loading dataset {dataset_name}') metrics_df = load_profiles( datasets=datasets, @@ -645,13 +674,21 @@ def evaluate( qual_dir = f'{platform_cache}/qual' ensure_directory(qual_dir) + split_fn = None quals = os.listdir(qual_dir) for ds_name, ds_meta in datasets.items(): if ds_name not in quals: + # run qual tool if needed eventlogs = ds_meta['eventlogs'] for eventlog in eventlogs: eventlog = os.path.expandvars(eventlog) run_qualification_tool(platform, eventlog, f'{qual_dir}/{ds_name}') + if 'split_function' in ds_meta: + # get split_function from plugin + plugin_path = ds_meta['split_function'] + logger.info(f'Using split function for {ds_name} dataset from plugin: {plugin_path}') + plugin = load_plugin(plugin_path) + split_fn = plugin.split_function logger.info('Loading qualification tool CSV files.') node_level_supp, qualtool_output, qual_sql_preds, _ = _get_qual_data(qual_dir) @@ -664,7 +701,9 @@ def evaluate( if profile_df.empty: raise ValueError(f'Warning: No profile data found for {dataset}') - split_fn = split_all_test if 'test' in dataset_name else split_nds + if not split_fn: + # use default split_fn if not specified + split_fn = split_all_test if 'test' in dataset_name else split_nds # raw predictions on unfiltered data raw_sql, raw_app = _predict( diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/util.py b/user_tools/src/spark_rapids_tools/tools/qualx/util.py index 8cfbc3a48..8ee16133d 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/util.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/util.py @@ -124,20 +124,28 @@ def get_dataset_platforms(dataset: str) -> Tuple[List[str], str]: dataset: str Path to datasets directory, datasets/platform directory, or datasets/platform/dataset.json file. """ + supported_platforms = [ + 'databricks-aws', + 'databricks-azure', + 'dataproc', + 'emr', + 'onprem' + ] + splits = Path(dataset).parts - platform = splits[-1] - if platform.endswith('.json'): - # dataset JSON, assume parent dir is platform + basename = splits[-1] + if basename.endswith('.json'): + # dataset JSON platforms = [splits[-2]] dataset_base = os.path.join(*splits[:-2]) - elif platform == 'datasets': - # all datasets, assume directory contains platforms + elif basename in supported_platforms: + # platform directory + platforms = [basename] + dataset_base = os.path.join(*splits[:-1]) + else: + # datasets directory platforms = os.listdir(dataset) dataset_base = dataset - else: - # default, last component is platform - platforms = [platform] - dataset_base = os.path.join(*splits[:-1]) return platforms, dataset_base @@ -205,8 +213,7 @@ def load_profiles_hook(df: pd.DataFrame) -> pd.DataFrame: logger.info(f'Successfully loaded plugin: {plugin_path}') return module else: - logger.warning(f'Failed to load plugin: {plugin_path}') - return None + raise FileNotFoundError(f'Plugin not found: {plugin_path}') def random_string(length: int) -> str: