Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve console output from python tool for failed/gpu/photon event logs #1235

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion user_tools/src/spark_rapids_pytools/common/prop_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class AbstractPropertiesContainer(object):
"""
An abstract class that loads properties (dictionary).
"""
prop_arg: str
prop_arg: Union[str, dict]
file_load: bool = True
props: Any = field(default=None, init=False)

Expand Down
186 changes: 105 additions & 81 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,97 +44,91 @@ class QualificationSummary:
"""
Encapsulates the logic to organize Qualification report.
"""
comments: Any = None
all_apps: pd.DataFrame = None
recommended_apps: pd.DataFrame = None
df_result: pd.DataFrame = None
total_apps: pd.DataFrame = field(init=True) # Total apps, including failed or skipped
tools_processed_apps: pd.DataFrame = None # Apps after tools processing and heuristic filtering
recommended_apps: pd.DataFrame = None # Apps recommended by legacy speedups. TODO: Should use QualX
filter_apps_count: int = field(default=0, init=False) # Count after applying console filters (top candidates)
top_candidates_flag: bool = False
comments: Any = None
sections_generators: List[Callable] = field(default_factory=lambda: [])
filter_apps_count: int = field(default=0, init=False)
conversion_items: dict = field(default_factory=dict)
auto_tuning_path: str = None

def _get_total_durations(self) -> int:
if not self.is_empty():
return self.all_apps['App Duration'].sum()
if self._has_tools_processed_apps():
return self.tools_processed_apps['App Duration'].sum()
return 0

def _get_total_gpu_durations(self) -> int:
if not self.is_empty():
return self.all_apps['Estimated GPU Duration'].sum()
if self._has_tools_processed_apps():
return self.tools_processed_apps['Estimated GPU Duration'].sum()
return 0

def _get_stats_total_cost(self) -> float:
return self.df_result['Estimated App Cost'].sum()
if self._has_tools_processed_apps() and 'Estimated App Cost' in self.tools_processed_apps.columns:
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved
return self.tools_processed_apps['Estimated App Cost'].sum()
return 0.0

def _get_stats_total_gpu_cost(self) -> float:
return self.df_result['Estimated GPU Cost'].sum()
if self._has_tools_processed_apps() and 'Estimated GPU Cost' in self.tools_processed_apps.columns:
cindyyuanjiang marked this conversation as resolved.
Show resolved Hide resolved
return self.tools_processed_apps['Estimated GPU Cost'].sum()
return 0.0

def _get_stats_total_apps(self) -> int:
if not self.is_empty():
return len(self.all_apps)
return len(self.total_apps)

def _get_stats_success_apps(self) -> int:
if self._has_apps():
return len(self.total_apps[self.total_apps['Status'] == 'SUCCESS'])
return 0

def _get_stats_recommended_apps(self) -> int:
if self.has_gpu_recommendation():
if self._has_gpu_recommendation():
return len(self.recommended_apps)
return 0

def is_empty(self) -> bool:
if self.all_apps is not None:
return self.all_apps.empty
return True
def _has_apps(self) -> bool:
return self.total_apps is not None and not self.total_apps.empty

def has_gpu_recommendation(self) -> bool:
if self.recommended_apps is not None:
return not self.recommended_apps.empty
return False
def _has_tools_processed_apps(self) -> bool:
return self.tools_processed_apps is not None and not self.tools_processed_apps.empty

def has_tabular_result(self) -> bool:
if self.df_result is not None:
return not self.df_result.empty
return False
def _has_gpu_recommendation(self) -> bool:
return self.recommended_apps is not None and not self.recommended_apps.empty

def generate_report(self,
app_name: str,
wrapper_output_files_info: dict,
csp_report_provider: Callable[[], List[str]] = lambda: [],
df_pprinter: Any = None,
output_pprinter: Any = None):
output_pprinter: Any = None) -> list:
report_content = []
if self.is_empty():
if not self._has_apps():
# Qualification tool has no output
report_content.append(f'{app_name} tool did not generate any valid rows')
report_content.append(f'\n{app_name} tool did not generate any valid rows')
if self.comments:
report_content.append(Utils.gen_multiline_str(self.comments))
return report_content

if output_pprinter is not None:
report_content.append(output_pprinter())

if not self.has_gpu_recommendation():
report_content.append(f'{app_name} tool found no recommendations for GPU.')

if self.has_tabular_result():
for entry in wrapper_output_files_info:
path = wrapper_output_files_info[entry]['path']
output_comment = wrapper_output_files_info[entry]['outputComment']
if path is not None:
abs_path = FSUtil.get_abs_path(path)
if FSUtil.resource_exists(abs_path): # check if the file exists
report_content.append(f' - {output_comment}: {abs_path}')

pretty_df = df_pprinter(self.df_result)
self.filter_apps_count = len(pretty_df)
if pretty_df.empty:
# the results were reduced to no rows because of the filters
report_content.append(
f'{app_name} tool found no qualified applications after applying the filters.\n'
f'See the CSV file for full report or disable the filters.')
# Output files comments should be generated even if there are no apps to show
self._generate_output_files_comments(wrapper_output_files_info, report_content)
if self._get_stats_success_apps() > 0:
if self._has_tools_processed_apps():
# TODO: Rename function to indicate the returned df includes filtered applications
pretty_df = df_pprinter(self.tools_processed_apps)
self.filter_apps_count = len(pretty_df)
if pretty_df.empty:
# the results were reduced to no rows because of the filters
report_content.append(
f'\n{app_name} tool found no qualified applications after applying the filters.\n'
f'See the CSV file for full report or disable the filters.')
else:
report_content.append(tabulate(pretty_df, headers='keys', tablefmt='psql', floatfmt='.2f'))
else:
report_content.append(tabulate(pretty_df, headers='keys', tablefmt='psql', floatfmt='.2f'))
report_content.append(f'\n{app_name} tool found no recommendations for GPU.')
else:
report_content.append(f'{app_name} tool found no records to show.')
report_content.append(f'\n{app_name} tool found no successful applications to process.')

if self.filter_apps_count > 0:
self.comments.append('**Estimated GPU Speedup Category assumes the user is using the node type '
Expand All @@ -150,7 +144,7 @@ def generate_report(self,
for section_generator in self.sections_generators:
if section_generator:
report_content.append(Utils.gen_multiline_str(section_generator()))
if self.has_gpu_recommendation():
if self._has_gpu_recommendation():
csp_report = csp_report_provider()
if csp_report:
report_content.extend(csp_report)
Expand All @@ -162,19 +156,39 @@ def __generate_report_summary(self):
def format_float(x: float) -> str:
return f'{x:.2f}'

report_summary = [['Total applications', self._get_stats_total_apps()]]
report_summary = [['Total applications', self._get_stats_total_apps()],
['Processed applications', self._get_stats_success_apps()]]
if self.top_candidates_flag:
# TODO: Similarly, we should include a line that shows number of apps after filtering for other filter types
report_summary.append(['Top candidates', self.filter_apps_count])
if not self.top_candidates_flag:
overall_speedup = 0.0
total_apps_durations = self._get_total_durations()
total_gpu_durations = self._get_total_gpu_durations()
if total_gpu_durations > 0:
overall_speedup = total_apps_durations / total_gpu_durations
report_summary.append(['Overall estimated speedup', format_float(overall_speedup)])
else:
# TODO: this should be updated to use recommendations from QualX instead of the legacy speedups column
recommended_apps_count = self._get_stats_recommended_apps()
report_summary.append(['Recommended applications', recommended_apps_count])
if recommended_apps_count > 0:
# if there are no RAPIDS candidates, do not display the estimated speedup or cost savings row in console
overall_speedup = 0.0
total_apps_durations = self._get_total_durations()
total_gpu_durations = self._get_total_gpu_durations()
if total_gpu_durations > 0:
overall_speedup = total_apps_durations / total_gpu_durations
report_summary.append(['Overall estimated speedup', format_float(overall_speedup)])
return report_summary

@classmethod
def _generate_output_files_comments(cls, output_files_info: dict, report_content: list) -> None:
"""
Generate comments for the output files to be displayed in the console report.
:param output_files_info: Dictionary containing the output files information.
:param report_content: List to which the output files comments will be appended.
"""
for entry in output_files_info.values():
path = entry.get('path', None)
output_comment = entry.get('outputComment', None)
if path is not None and output_comment is not None:
abs_path = FSUtil.get_abs_path(path)
if FSUtil.resource_exists(abs_path): # check if the file exists
report_content.append(f' - {output_comment}: {abs_path}')


@dataclass
class Qualification(RapidsJarTool):
Expand Down Expand Up @@ -364,6 +378,7 @@ def __is_savings_calc_enabled(self) -> bool:
return cost_savings_func_flag and self.ctxt.get_ctxt('enableSavingsCalculations')

def __get_recommended_apps(self, all_rows, selected_cols=None) -> pd.DataFrame:
# TODO: This function should be updated to use speed ups from QualX instead of the legacy speed ups column
speed_up_col = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport',
'recommendations', 'speedUp', 'columnName')
recommended_vals = self.ctxt.get_value('toolOutput', 'csv', 'summaryReport',
Expand Down Expand Up @@ -585,11 +600,14 @@ def __apply_gpu_cluster_reshape(self, all_apps: pd.DataFrame) -> (pd.DataFrame,

def __build_global_report_summary(self,
all_apps: pd.DataFrame,
total_apps: pd.DataFrame,
unsupported_ops_df: pd.DataFrame,
output_files_raw: dict) -> QualificationSummary:
filter_top_candidate_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.TOP_CANDIDATES
if all_apps.empty:
# No need to run saving estimator or process the data frames.
return QualificationSummary()
return QualificationSummary(total_apps=total_apps,
top_candidates_flag=filter_top_candidate_enabled)

output_files_info = JSONPropertiesContainer(output_files_raw, file_load=False)
unsupported_ops_obj = UnsupportedOpsStageDuration(self.ctxt.get_value('local', 'output',
Expand Down Expand Up @@ -625,14 +643,17 @@ def __build_global_report_summary(self,
# Add columns for cluster configuration recommendations and tuning configurations to the processed_apps.
recommender = ClusterConfigRecommender(self.ctxt)
df_final_result = recommender.add_cluster_and_tuning_recommendations(df_final_result)
# Merge the total_apps with the processed_apps to get the Event Log
df_final_result = pd.merge(df_final_result, total_apps[['Event Log', 'AppID']],
left_on='App ID', right_on='AppID')
# Write the summary metadata
self._write_summary_metadata(df_final_result, output_files_info.get_value('summaryMetadata'),
output_files_info.get_value('configRecommendations'))
return QualificationSummary(comments=report_comments,
all_apps=apps_grouped_df,
return QualificationSummary(total_apps=total_apps,
tools_processed_apps=df_final_result,
recommended_apps=recommended_apps,
df_result=df_final_result,
top_candidates_flag=filter_top_candidate_enabled)
top_candidates_flag=filter_top_candidate_enabled,
comments=report_comments)

def _process_output(self) -> None:
def process_df_for_stdout(raw_df):
Expand Down Expand Up @@ -690,11 +711,7 @@ def process_df_for_stdout(raw_df):
if not self._evaluate_rapids_jar_tool_output_exist():
return

rapids_output_dir = self.ctxt.get_rapids_output_folder()
rapids_summary_file = FSUtil.build_path(rapids_output_dir,
self.ctxt.get_value('toolOutput', 'csv', 'summaryReport', 'fileName'))
self.ctxt.logger.debug('Rapids CSV summary file is located as: %s', rapids_summary_file)
df = pd.read_csv(rapids_summary_file)
df = self._read_qualification_output_file('summaryReport')
# 1. Operations related to XGboost modelling
if self.ctxt.get_ctxt('estimationModelArgs')['xgboostEnabled']:
try:
Expand All @@ -706,9 +723,7 @@ def process_df_for_stdout(raw_df):

# 2. Operations related to cluster information
try:
cluster_info_file = self.ctxt.get_value('toolOutput', 'csv', 'clusterInformation', 'fileName')
cluster_info_file = FSUtil.build_path(rapids_output_dir, cluster_info_file)
cluster_info_df = pd.read_csv(cluster_info_file)
cluster_info_df = self._read_qualification_output_file('clusterInformation')
# Merge using a left join on 'App Name' and 'App ID'. This ensures `df` includes all cluster
# info columns, even if `cluster_info_df` is empty.
df = pd.merge(df, cluster_info_df, on=['App Name', 'App ID'], how='left')
Expand All @@ -718,15 +733,13 @@ def process_df_for_stdout(raw_df):
self.logger.error('Unable to process cluster information. Cost savings will be disabled. '
'Reason - %s:%s', type(e).__name__, e)

# 3. Operations related to unsupported operators
unsupported_operator_report_file = self.ctxt.get_value('toolOutput', 'csv', 'unsupportedOperatorsReport',
'fileName')
rapids_unsupported_operators_file = FSUtil.build_path(rapids_output_dir, unsupported_operator_report_file)
unsupported_ops_df = pd.read_csv(rapids_unsupported_operators_file)
# 3. Operations related to reading qualification output (unsupported operators and apps status)
unsupported_ops_df = self._read_qualification_output_file('unsupportedOperatorsReport')
apps_status_df = self._read_qualification_output_file('appsStatusReport')

# 4. Operations related to output
output_files_info = self.__build_output_files_info()
report_gen = self.__build_global_report_summary(df, unsupported_ops_df, output_files_info)
report_gen = self.__build_global_report_summary(df, apps_status_df, unsupported_ops_df, output_files_info)
summary_report = report_gen.generate_report(app_name=self.pretty_name(),
wrapper_output_files_info=output_files_info,
csp_report_provider=self._generate_platform_report_sections,
Expand Down Expand Up @@ -898,6 +911,17 @@ def _write_summary_metadata(self, tools_processed_apps: pd.DataFrame,
self.logger.error('Error writing the summary metadata report. Reason - %s:%s',
type(e).__name__, e)

def _read_qualification_output_file(self, report_name_key: str, file_format_key: str = 'csv') -> pd.DataFrame:
"""
Helper method to read a report file from the Scala qualification tool output folder
:param report_name_key: Key in the config file to get the report name
:param file_format_key: Key in the config file to get the file format, default is 'csv'
"""
# extract the file name of report from the YAML config (e.g., toolOutput -> csv -> summaryReport -> fileName)
report_file_name = self.ctxt.get_value('toolOutput', file_format_key, report_name_key, 'fileName')
report_file_path = FSUtil.build_path(self.ctxt.get_rapids_output_folder(), report_file_name)
return pd.read_csv(report_file_path)


@dataclass
class QualificationAsLocal(Qualification):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ toolOutput:
fileName: rapids_4_spark_qualification_output_cluster_information.csv
tunings:
subFolder: tuning
appsStatusReport:
fileName: rapids_4_spark_qualification_output_status.csv
summaryReport:
fileName: rapids_4_spark_qualification_output.csv
columns:
Expand Down Expand Up @@ -145,11 +147,15 @@ local:
columns:
- 'App ID'
- 'App Name'
- 'Event Log'
- 'Source Cluster'
- 'Recommended Cluster'
- 'Estimated GPU Speedup Category'
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
appsStatusReport:
name: 'rapids_4_spark_qualification_output/rapids_4_spark_qualification_output_status.csv'
outputComment: "Application status report"
costColumns:
- 'Savings Based Recommendation'
- 'Estimated App Cost'
Expand Down