Skip to content

Commit

Permalink
Add Arguments for Distributed Mode in Qualification Tool CLI (#1429)
Browse files Browse the repository at this point in the history
* Add arguments for running tools in distributed mode

Signed-off-by: Partho Sarthi <[email protected]>

* Refactor to use tools config file

Signed-off-by: Partho Sarthi <[email protected]>

* Update specification

Signed-off-by: Partho Sarthi <[email protected]>

* Update tools config file

Signed-off-by: Partho Sarthi <[email protected]>

* Update comment

Signed-off-by: Partho Sarthi <[email protected]>

* Add pylint exception

Signed-off-by: Partho Sarthi <[email protected]>

* Include hdfs output dir in tools config file

Signed-off-by: Partho Sarthi <[email protected]>

* Add comment about assumption of Spark JARs

Signed-off-by: Partho Sarthi <[email protected]>

* Revert changes in stats report

Signed-off-by: Partho Sarthi <[email protected]>

* Submission mode Args

Signed-off-by: Partho Sarthi <[email protected]>

* Modify the arguments structure

Signed-off-by: Partho Sarthi <[email protected]>

* Bump up the API version for tools config file

Signed-off-by: Partho Sarthi <[email protected]>

* Update python arg tests

Signed-off-by: Partho Sarthi <[email protected]>

* Remove pylint disable rule in CSPs

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Dec 20, 2024
1 parent 6af4bdb commit 6c61e52
Show file tree
Hide file tree
Showing 27 changed files with 680 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DBAzureLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def validate_job_submission_args(self, submission_args: dict) -> dict:
pass

Expand Down
3 changes: 3 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def validate_job_submission_args(self, submission_args: dict) -> dict:
pass

Expand Down
3 changes: 3 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocGkeLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass


@dataclass
class DataprocGkeCMDDriver(DataprocCMDDriver):
Expand Down
3 changes: 3 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return EmrLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
pass

def generate_cluster_configuration(self, render_args: dict):
image_version = self.configs.get_value_silent('clusterInference', 'defaultImage')
render_args['IMAGE'] = f'"{image_version}"'
Expand Down
14 changes: 13 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/onprem.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
from typing import Any, List, Optional

from spark_rapids_tools import CspEnv
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob
from spark_rapids_pytools.cloud_api.sp_types import PlatformBase, ClusterBase, ClusterNode, \
CMDDriverBase, ClusterGetAccessor, GpuDevice, \
GpuHWInfo, NodeHWInfo, SparkNodeType, SysInfo
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import StorageDriver
from spark_rapids_pytools.pricing.dataproc_pricing import DataprocPriceProvider
from spark_rapids_pytools.pricing.price_provider import SavingsEstimator
from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob, RapidsDistributedJob


@dataclass
Expand All @@ -49,6 +49,9 @@ def _install_storage_driver(self):
def create_local_submission_job(self, job_prop, ctxt) -> Any:
return OnPremLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def create_distributed_submission_job(self, job_prop, ctxt) -> RapidsDistributedJob:
return OnPremDistributedRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)

def _construct_cluster_from_props(self, cluster: str, props: str = None, is_inferred: bool = False,
is_props_file: bool = False):
return OnPremCluster(self, is_inferred=is_inferred).set_connection(cluster_id=cluster, props=props)
Expand Down Expand Up @@ -154,6 +157,15 @@ class OnPremLocalRapidsJob(RapidsLocalJob):
job_label = 'onpremLocal'


# pylint: disable=abstract-method
@dataclass
class OnPremDistributedRapidsJob(RapidsDistributedJob):
"""
Implementation of a RAPIDS job that runs on a distributed cluster
"""
job_label = 'onprem.distributed'


@dataclass
class OnPremNode(ClusterNode):
"""Implementation of Onprem cluster node."""
Expand Down
5 changes: 4 additions & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
from logging import Logger
from typing import Type, Any, List, Callable, Union, Optional, final, Dict

from spark_rapids_tools import EnumeratedType, CspEnv
from spark_rapids_pytools.common.prop_manager import AbstractPropertiesContainer, JSONPropertiesContainer, \
get_elem_non_safe
from spark_rapids_pytools.common.sys_storage import StorageDriver, FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, SysCmd, Utils, TemplateGenerator
from spark_rapids_tools import EnumeratedType, CspEnv


class DeployMode(EnumeratedType):
Expand Down Expand Up @@ -884,6 +884,9 @@ def create_saving_estimator(self,
def create_local_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def create_distributed_submission_job(self, job_prop, ctxt) -> Any:
raise NotImplementedError

def load_platform_configs(self):
config_file_name = f'{CspEnv.tostring(self.type_id).lower()}-configs.json'
config_path = Utils.resource_path(config_file_name)
Expand Down
16 changes: 14 additions & 2 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, SubmissionMode
from spark_rapids_tools.storagelib import CspFs
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
Expand Down Expand Up @@ -153,6 +153,17 @@ def _process_estimation_model_args(self) -> None:
estimation_model_args = QualEstimationModel.create_default_model_args(selected_model)
self.ctxt.set_ctxt('estimationModelArgs', estimation_model_args)

def _process_submission_mode_arg(self) -> None:
"""
Process the value provided by `--submission_mode` argument.
"""
submission_mode_arg = self.wrapper_options.get('submissionMode')
if submission_mode_arg is None or not submission_mode_arg:
submission_mode = SubmissionMode.get_default()
else:
submission_mode = SubmissionMode.fromstring(submission_mode_arg)
self.ctxt.set_ctxt('submissionMode', submission_mode)

def _process_custom_args(self) -> None:
"""
Qualification tool processes extra arguments:
Expand Down Expand Up @@ -181,6 +192,7 @@ def _process_custom_args(self) -> None:
self._process_estimation_model_args()
self._process_offline_cluster_args()
self._process_eventlogs_args()
self._process_submission_mode_arg()
# This is noise to dump everything
# self.logger.debug('%s custom arguments = %s', self.pretty_name(), self.ctxt.props['wrapperCtx'])

Expand Down Expand Up @@ -375,7 +387,7 @@ def create_stdout_table_pprinter(total_apps: pd.DataFrame,

df = self._read_qualification_output_file('summaryReport')
# 1. Operations related to XGboost modelling
if self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
if not df.empty and self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
try:
df = self.__update_apps_with_prediction_info(df,
self.ctxt.get_ctxt('estimationModelArgs'))
Expand Down
55 changes: 45 additions & 10 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
import os
from dataclasses import dataclass, field
from logging import Logger
from typing import List, Optional
from typing import List, Optional, Union

from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools.storagelib import LocalPath
from spark_rapids_tools_distributed.jar_cmd_args import JarCmdArgs


@dataclass
Expand All @@ -38,6 +39,8 @@ def _init_fields(self):
self.props['sparkConfArgs'] = {}
if self.get_value_silent('platformArgs') is None:
self.props['platformArgs'] = {}
if self.get_value_silent('distributedToolsConfigs') is None:
self.props['distributedToolsConfigs'] = {}

def get_jar_file(self):
return self.get_value('rapidsArgs', 'jarFile')
Expand All @@ -48,6 +51,9 @@ def get_jar_main_class(self):
def get_rapids_args(self):
return self.get_value('rapidsArgs', 'jarArgs')

def get_distribution_tools_configs(self):
return self.get_value('distributedToolsConfigs')


@dataclass
class RapidsJob:
Expand Down Expand Up @@ -90,10 +96,10 @@ def _build_rapids_args(self):
rapids_arguments.extend(extra_rapids_args)
return rapids_arguments

def _build_submission_cmd(self) -> list:
def _build_submission_cmd(self) -> Union[list, JarCmdArgs]:
raise NotImplementedError

def _submit_job(self, cmd_args: list) -> str:
def _submit_job(self, cmd_args: Union[list, JarCmdArgs]) -> str:
raise NotImplementedError

def _print_job_output(self, job_output: str):
Expand Down Expand Up @@ -125,13 +131,6 @@ def run_job(self):
self._cleanup_temp_log4j_files()
return job_output


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _get_hadoop_classpath(self) -> Optional[str]:
"""
Gets the Hadoop's configuration directory from the environment variables.
Expand Down Expand Up @@ -202,6 +201,13 @@ def _build_jvm_args(self):
vm_args.append(val)
return vm_args


@dataclass
class RapidsLocalJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs local on a machine.
"""

def _build_submission_cmd(self) -> list:
# env vars are added later as a separate dictionary
classpath_arr = self._build_classpath()
Expand All @@ -218,3 +224,32 @@ def _submit_job(self, cmd_args: list) -> str:
out_std = self.exec_ctxt.platform.cli.run_sys_cmd(cmd=cmd_args,
env_vars=env_args)
return out_std


@dataclass
class RapidsDistributedJob(RapidsJob):
"""
Implementation of a RAPIDS job that runs distributed on a cluster.
"""

def _build_submission_cmd(self) -> JarCmdArgs:
classpath_arr = self._build_classpath()
hadoop_cp = self._get_hadoop_classpath()
jvm_args_arr = self._build_jvm_args()
jar_main_class = self.prop_container.get_jar_main_class()
jar_output_dir_args = self._get_persistent_rapids_args()
extra_rapids_args = self.prop_container.get_rapids_args()
return JarCmdArgs(jvm_args_arr, classpath_arr, hadoop_cp, jar_main_class,
jar_output_dir_args, extra_rapids_args)

def _build_classpath(self) -> List[str]:
"""
Only the Spark RAPIDS Tools JAR file is needed for the classpath.
Assumption: Each worker node should have the Spark Jars pre-installed.
TODO: Ship the Spark JARs to the cluster to avoid version mismatch issues.
"""
return ['-cp', self.prop_container.get_jar_file()]

def _submit_job(self, cmd_args: JarCmdArgs) -> None:
# TODO: Support for submitting the Tools JAR to a Spark cluster
raise NotImplementedError('Distributed job submission is not yet supported')
36 changes: 32 additions & 4 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools import CspEnv
from spark_rapids_tools.configuration.common import RuntimeDependency
from spark_rapids_tools.configuration.submission.distributed_config import DistributedToolsConfig
from spark_rapids_tools.configuration.tools_config import ToolsConfig
from spark_rapids_tools.enums import DependencyType
from spark_rapids_tools.storagelib import LocalPath, CspFs
Expand Down Expand Up @@ -608,7 +609,7 @@ def populate_dependency_list() -> List[RuntimeDependency]:
# check if the dependencies is defined in a config file
config_obj = self.get_tools_config_obj()
if config_obj is not None:
if config_obj.runtime.dependencies:
if config_obj.runtime and config_obj.runtime.dependencies:
return config_obj.runtime.dependencies
self.logger.info('The ToolsConfig did not specify the dependencies. '
'Falling back to the default dependencies.')
Expand Down Expand Up @@ -939,10 +940,33 @@ def _prepare_local_job_arguments(self):
'sparkConfArgs': spark_conf_args,
'platformArgs': platform_args
}
# Set the configuration for the distributed tools
distributed_tools_configs = self._get_distributed_tools_configs()
if distributed_tools_configs:
job_properties_json['distributedToolsConfigs'] = distributed_tools_configs
rapids_job_container = RapidsJobPropContainer(prop_arg=job_properties_json,
file_load=False)
self.ctxt.set_ctxt('rapidsJobContainers', [rapids_job_container])

def _get_distributed_tools_configs(self) -> Optional[DistributedToolsConfig]:
"""
Parse the tools configuration and return as distributed tools configuration object
"""
config_obj = self.get_tools_config_obj()
if config_obj and config_obj.submission:
if self.ctxt.is_distributed_mode():
return config_obj
self.logger.warning(
'Distributed tool configurations detected, but distributed mode is not enabled.'
'Use \'--submission_mode distributed\' flag to enable distributed mode. Switching to local mode.'
)
elif self.ctxt.is_distributed_mode():
self.logger.warning(
'Distributed mode is enabled, but no distributed tool configurations were provided. '
'Using default settings.'
)
return None

def _archive_results(self):
self._archive_local_results()

Expand All @@ -961,14 +985,18 @@ def _submit_jobs(self):
executors_cnt = len(rapids_job_containers) if Utilities.conc_mode_enabled else 1
with ThreadPoolExecutor(max_workers=executors_cnt) as executor:
for rapids_job in rapids_job_containers:
job_obj = self.ctxt.platform.create_local_submission_job(job_prop=rapids_job,
ctxt=self.ctxt)
if self.ctxt.is_distributed_mode():
job_obj = self.ctxt.platform.create_distributed_submission_job(job_prop=rapids_job,
ctxt=self.ctxt)
else:
job_obj = self.ctxt.platform.create_local_submission_job(job_prop=rapids_job,
ctxt=self.ctxt)
futures = executor.submit(job_obj.run_job)
futures_list.append(futures)
try:
for future in concurrent.futures.as_completed(futures_list):
result = future.result()
results.append(result)
except Exception as ex: # pylint: disable=broad-except
self.logger.error('Failed to download dependencies %s', ex)
self.logger.error('Failed to submit jobs %s', ex)
raise ex
7 changes: 7 additions & 0 deletions user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import ToolLogging, Utils
from spark_rapids_tools import CspEnv, CspPath
from spark_rapids_tools.enums import SubmissionMode
from spark_rapids_tools.utils import Utilities


Expand Down Expand Up @@ -89,6 +90,12 @@ def get_deploy_mode(self) -> Any:
def is_fat_wheel_mode(self) -> bool:
return self.get_ctxt('fatWheelModeEnabled')

def is_distributed_mode(self) -> bool:
return self.get_ctxt('submissionMode') == SubmissionMode.DISTRIBUTED

def is_local_mode(self) -> bool:
return self.get_ctxt('submissionMode') == SubmissionMode.LOCAL

def set_ctxt(self, key: str, val: Any):
self.props['wrapperCtx'][key] = val

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
"value": "a65839fbf1869f81a1632e09f415e586922e4f80"
},
"size": 962685
},
"type": "jar"
}
},
{
"name": "AWS Java SDK Bundled",
Expand All @@ -40,8 +39,7 @@
"value": "02deec3a0ad83d13d032b1812421b23d7a961eea"
},
"size": 280645251
},
"type": "jar"
}
}
],
"333": [
Expand Down
Loading

0 comments on commit 6c61e52

Please sign in to comment.