Skip to content

Commit

Permalink
Cluster information should handle dynamic allocation and nodes being …
Browse files Browse the repository at this point in the history
…removed and added (#1369)

* Properly handle multi-tenant clusters and dynamic allocation in the
cluster information.

* Fix threading issues with storing cluster information in Platform. Make
platform per app.

---------

Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
tgravescs authored Oct 15, 2024
1 parent eabb6c1 commit 98a862f
Show file tree
Hide file tree
Showing 21 changed files with 277 additions and 169 deletions.
11 changes: 11 additions & 0 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ mvn -Dbuildver=351 clean package

Run `mvn help:all-profiles` to list supported Spark versions.

### Running tests

The unit tests are run by default when building unless they are explicitly skipped by specifying `-DskipTests`.

To run an individual test the `-Dsuites` option can be specified:

```bash
mvn test -Dsuites=com.nvidia.spark.rapids.tool.qualification.QualificationSuite
```


### Setting up an Integrated Development Environment

Before proceeding with importing spark-rapids-tools into IDEA or switching to a different Spark release
Expand Down
130 changes: 87 additions & 43 deletions core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ object PlatformNames {
)
}

case class DynamicAllocationInfo(enabled: Boolean, max: String, min: String, initial: String)

// resource information and name of the CSP instance types, or for onprem its
// the executor information since we can't recommend node types
case class InstanceInfo(cores: Int, memoryMB: Long, name: String, numGpus: Int)
Expand Down Expand Up @@ -211,7 +213,6 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
*/
def getRetainedSystemProps: Set[String] = Set.empty


def getExecutorHeapMemoryMB(sparkProperties: Map[String, String]): Long = {
// Potentially enhance this to handle if no config then check the executor
// added or resource profile added events for the heap size
Expand Down Expand Up @@ -288,28 +289,20 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
math.max(1, gpus)
}

// Get the number of nodes that were used in the source cluster.
def getSourceNumNodes(): Int = {
if (clusterProperties.isDefined) {
Math.max(1, clusterProperties.get.system.numWorkers)
} else if (clusterInfoFromEventLog.isDefined) {
clusterInfoFromEventLog.get.numWorkerNodes
} else {
1
}
}

// we want to keep the number of executors used between runs the same
def getNumExecutorInstances(sparkProperties: Map[String, String]): Int = {
val dynamicAllocationEnabled = Platform.isDynamicAllocationEnabled(sparkProperties)
val execInstFromProps = sparkProperties.get("spark.executor.instances")
// If the cluster properties were specified make sure to use those and not
// the eventlog inference. This is broken in my mind but is backwards compatible,
// or maybe use number gpus per node as an improvement.
if (clusterProperties.isDefined) {
val numWorkers = Math.max(1, clusterProperties.get.system.numWorkers)
this.numGpus * numWorkers
} else if (execInstFromProps.isDefined) {
} else if (execInstFromProps.isDefined && !dynamicAllocationEnabled) {
execInstFromProps.get.toInt
} else if (clusterInfoFromEventLog.isDefined) {
val clusterInfo = clusterInfoFromEventLog.get
clusterInfo.numWorkerNodes * clusterInfo.numExecsPerNode
clusterInfoFromEventLog.get.numExecutors
} else {
// not sure so don't set it
0
Expand Down Expand Up @@ -338,24 +331,28 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],

def createClusterInfo(coresPerExecutor: Int,
numExecsPerNode: Int,
numExecs: Int,
numWorkerNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): ExistingClusterInfo = {
val driverHost = sparkProperties.get("spark.driver.host")
val executorHeapMem = getExecutorHeapMemoryMB(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numWorkerNodes,
executorHeapMem, driverHost = driverHost)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numExecs, numWorkerNodes,
executorHeapMem, dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial, driverHost = driverHost)
}

// set the cluster information for this platform based on what we found in the
// eventlog
def configureClusterInfoFromEventLog(coresPerExecutor: Int,
execsPerNode: Int,
numExecs: Int,
numExecutorNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): Unit = {
clusterInfoFromEventLog = Some(createClusterInfo(coresPerExecutor, execsPerNode,
numExecutorNodes, sparkProperties, systemProperties))
numExecs, numExecutorNodes, sparkProperties, systemProperties))
}

override def toString: String = {
Expand Down Expand Up @@ -383,17 +380,30 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
*/
def getGPUInstanceTypeRecommendation(
sparkProperties: Map[String, String]): Option[RecommendedClusterInfo] = {
val initialNumExecInstances = getNumExecutorInstances(sparkProperties)
val vendor = clusterInfoFromEventLog.map(_.vendor).getOrElse("")
val numExecs = getNumExecutorInstances(sparkProperties)
// If the cluster properties were specified make sure to use those and not
// the eventlog inference. This is broken in my mind but is backwards compatible,
// or maybe use number gpus per node as an improvement.
val origClusterNumExecsPerNode = clusterInfoFromEventLog.map(_.numExecsPerNode).getOrElse(1)
val numExecsPerNode = if (clusterProperties.isEmpty) {
clusterInfoFromEventLog.map(_.numExecsPerNode).getOrElse(1)
// numExecsPerNode can be -1 if dynamic allocation so just make it 1 for
// this set of calculations. However if we are on a CSP then we want to recommend
// the best size machine so use the number of GPUs as proxy to be the number of executors
// we could put on a node.
if (origClusterNumExecsPerNode == -1) {
maxGpusSupported
} else {
origClusterNumExecsPerNode
}
} else {
1
}
// onprem yarn multi-tenant vs yarn static cluster (dataproc) for just that application
// should be handled automatically unless heterogeneous nodes
val gpusToUse =
Math.max(this.numGpus, Math.min(numExecsPerNode, maxGpusSupported))

// update the global numGpus based on the instance type we are using
this.numGpus = gpusToUse
val nodeCores = if (clusterProperties.isDefined) {
Expand Down Expand Up @@ -424,11 +434,6 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
logWarning("cluster information from event log is missing, executor cores set to 0!")
0
}
val numExecsPerNode = if (clusterInfoFromEventLog.isDefined) {
clusterInfoFromEventLog.get.numExecsPerNode
} else {
1
}
val nodeCoresToUse = execCores * gpusToUse
val nodeMemMB = getMemoryMBPerNode(sparkProperties)
// It's possible if a cpu run was used, it could run with multiple executors, but
Expand All @@ -454,29 +459,37 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
} else {
instanceInfoOpt
}
val numExistingNodes = getSourceNumNodes
// check if instance type supports that number of gpus, if not we add extra executors
val (numExecs, numNodes) = if (finalInstanceInfo.get.numGpus >= numExecsPerNode) {
// TODO - really if instance has more GPUs we should calculate the other way to
// recommend less nodes but leave that open for now
(initialNumExecInstances, numExistingNodes)
} else {
// just flatten to use 1 but we should really see if multiples
val numGpusLeft = numExecsPerNode / finalInstanceInfo.get.numGpus
(initialNumExecInstances, numExistingNodes * numGpusLeft)
}
// note this is going over as for instance if you have 4 gpus per node but only need
// 10 executors, this would tell you to allocate enough to fit 12.
val numNodes = math.ceil(numExecs.toDouble / finalInstanceInfo.get.numGpus).toInt
val coresPerExec = if (finalInstanceInfo.isDefined) {
finalInstanceInfo.get.cores / finalInstanceInfo.get.numGpus
// We may not be able to match instance type up exactly, this means the number of
// cores per executor could come out to be more then the original application.
// For now we want the cores per executor to stay the same as original app so if
// that is set, use it first.
if (clusterInfoFromEventLog.isDefined) {
clusterInfoFromEventLog.get.coresPerExecutor
} else {
finalInstanceInfo.get.cores / finalInstanceInfo.get.numGpus
}
} else {
1
}
val finalNumNodes = if (vendor == PlatformNames.ONPREM) {
// if its onprem we really have no idea of the size of the cluster
-1
} else {
numNodes
}
if (numExecs > 0) {
val vendor = clusterInfoFromEventLog.map(_.vendor).getOrElse("")
val instanceName = finalInstanceInfo.map(_.name).getOrElse("")
val numGpus = finalInstanceInfo.map(_.numGpus).getOrElse(1)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
// Num of executors per node is the number of GPUs
recommendedClusterInfo = Some(RecommendedClusterInfo(vendor, coresPerExec,
numNodes, numGpus, numExecs, gpuDevice = getGpuOrDefault.toString,
finalNumNodes, numGpus, numExecs, gpuDevice = getGpuOrDefault.toString,
dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial,
workerNodeType = Some(instanceName)))
recommendedNodeInstanceInfo = finalInstanceInfo
recommendedClusterInfo
Expand Down Expand Up @@ -505,6 +518,7 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],

override def createClusterInfo(coresPerExecutor: Int,
numExecsPerNode: Int,
numExecs: Int,
numWorkerNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): ExistingClusterInfo = {
Expand All @@ -514,8 +528,11 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice],
val driverHost = sparkProperties.get("spark.driver.host")
val clusterName = sparkProperties.get(DatabricksParseHelper.PROP_TAG_CLUSTER_NAME_KEY)
val executorHeapMem = getExecutorHeapMemoryMB(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numWorkerNodes,
executorHeapMem, driverNodeType, workerNodeType, driverHost, clusterId, clusterName)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numExecs, numWorkerNodes,
executorHeapMem, dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial, driverNodeType,
workerNodeType, driverHost, clusterId, clusterName)
}
}

Expand Down Expand Up @@ -629,14 +646,18 @@ class EmrPlatform(gpuDevice: Option[GpuDevice],

override def createClusterInfo(coresPerExecutor: Int,
numExecsPerNode: Int,
numExecs: Int,
numWorkerNodes: Int,
sparkProperties: Map[String, String],
systemProperties: Map[String, String]): ExistingClusterInfo = {
val clusterId = systemProperties.get("EMR_CLUSTER_ID")
val driverHost = sparkProperties.get("spark.driver.host")
val executorHeapMem = getExecutorHeapMemoryMB(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numWorkerNodes,
executorHeapMem, clusterId = clusterId, driverHost = driverHost)
val dynamicAllocSettings = Platform.getDynamicAllocationSettings(sparkProperties)
ExistingClusterInfo(platformName, coresPerExecutor, numExecsPerNode, numExecs,
numWorkerNodes, executorHeapMem, dynamicAllocSettings.enabled, dynamicAllocSettings.max,
dynamicAllocSettings.min, dynamicAllocSettings.initial, clusterId = clusterId,
driverHost = driverHost)
}

override def getInstanceByResources(
Expand Down Expand Up @@ -672,6 +693,29 @@ class OnPremPlatform(gpuDevice: Option[GpuDevice],
override def maxGpusSupported: Int = 1
}

object Platform {
def isDynamicAllocationEnabled(sparkProperties: Map[String, String]): Boolean = {
sparkProperties.getOrElse("spark.dynamicAllocation.enabled", "false").toBoolean
}

def getDynamicAllocationSettings(sparkProperties: Map[String, String]): DynamicAllocationInfo = {
val dynamicAllocationEnabled = isDynamicAllocationEnabled(sparkProperties)
if (dynamicAllocationEnabled) {
val dynamicAllocationMax = sparkProperties.
getOrElse("spark.dynamicAllocation.maxExecutors", Int.MaxValue.toString)
val dynamicAllocationMin = sparkProperties.
getOrElse("spark.dynamicAllocation.minExecutors", "0")
val dynamicAllocationInit = sparkProperties.
getOrElse("spark.dynamicAllocation.initialExecutors", sparkProperties.
getOrElse("spark.executor.instances", dynamicAllocationMin))
DynamicAllocationInfo(dynamicAllocationEnabled, dynamicAllocationMax,
dynamicAllocationMin, dynamicAllocationInit)
} else {
DynamicAllocationInfo(dynamicAllocationEnabled, "N/A", "N/A", "N/A")
}
}
}

/**
* Factory for creating instances of different platforms.
* This factory supports various platforms and provides methods for creating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1393,14 +1393,12 @@ object AutoTuner extends Logging {
}

def buildAutoTuner(
workerInfoFilePath: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.createInstance(clusterProperties = None),
platform: Platform,
driverInfoProvider: DriverLogInfoProvider = BaseDriverLogInfoProvider.noneDriverLog
): AutoTuner = {
try {
val clusterPropsOpt = loadClusterProps(workerInfoFilePath)
val autoT = new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()),
val autoT = new AutoTuner(platform.clusterProperties.getOrElse(new ClusterProperties()),
singleAppProvider, platform, driverInfoProvider)
autoT
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
if (appInfo.isDefined && appInfo.get.appInfo.head.pluginEnabled) {
val appInfoProvider = AppSummaryInfoBaseProvider.fromAppInfo(appInfo)
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val clusterPropsOpt = loadClusterProps(workerInfoPath)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath, appInfoProvider,
PlatformFactory.createInstance(platform, clusterPropsOpt), driverInfoProvider)
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(appInfoProvider,
PlatformFactory.createInstance(appArgs.platform(), clusterPropsOpt), driverInfoProvider)

// The autotuner allows skipping some properties,
// e.g., getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object OpSuppLevel extends Enumeration {
* by the plugin which lists the formats and types supported.
* The class also supports a custom speedup factor file as input.
*/
class PluginTypeChecker(val platform: Platform = PlatformFactory.createInstance(),
class PluginTypeChecker(platform: Platform = PlatformFactory.createInstance(),
speedupFactorFile: Option[String] = None) extends Logging {
private val NONE = "None"

Expand Down
Loading

0 comments on commit 98a862f

Please sign in to comment.