diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 877b43ec4d..b8dc9c5fbb 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -1007,6 +1007,42 @@ def process_node_data(self, node_configs: Sequence[NodeConfig]) -> Dict: logging.info("Finished processing node features") return node_data_dict + def _write_processed_feature( + self, + feat_name: str, + single_feature_df: DataFrame, + feature_output_path: str, + ) -> tuple[dict, int]: + + def _get_feat_size(feat_val) -> int: + + assert isinstance( + feat_val, (list, numbers.Number) + ), f"We expect features to either be scalars or lists of scalars, got {type(feat_val)}." + + if isinstance(feat_val, list): + for val in feat_val: + assert isinstance( + val, numbers.Number + ), f"We expect feature lists to be lists of scalars, got {type(val)}." + + nfeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val) + + return nfeat_size + + logging.info("Writing output for feat_name: '%s' to %s", feat_name, feature_output_path) + path_list = self._write_df(single_feature_df, feature_output_path, out_format=FORMAT_NAME) + + node_feature_metadata_dict = { + "format": {"name": FORMAT_NAME, "delimiter": DELIMITER}, + "data": path_list, + } + + feat_val = single_feature_df.take(1)[0].asDict().get(feat_name) + nfeat_size = _get_feat_size(feat_val) + + return node_feature_metadata_dict, nfeat_size + def _process_node_features( self, feature_configs: Sequence[FeatureConfig], nodes_df: DataFrame, node_type: str ) -> Tuple[Dict, Dict]: @@ -1063,62 +1099,43 @@ def _process_node_features( feat_conf.feat_name ] = json_representation - def write_processed_feature( - feat_name: str, - single_feature_df: DataFrame, - node_type: str, - transformer: DistFeatureTransformer, - ): - feature_output_path = os.path.join( - self.output_prefix, f"node_data/{node_type}-{feat_name}" - ) - logging.info( - "Writing output for feat_name: '%s' to %s", feat_name, feature_output_path - ) - path_list = self._write_df( - single_feature_df, feature_output_path, out_format=FORMAT_NAME - ) - - node_feature_metadata_dict = { - "format": {"name": FORMAT_NAME, "delimiter": DELIMITER}, - "data": path_list, - } - node_type_feature_metadata[feat_name] = node_feature_metadata_dict - - feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None) - nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) - ntype_feat_sizes.update({feat_name: nfeat_size}) - - self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = ( - perf_counter() - node_transformation_start - ) - - # TODO: Remove hack with [feat_conf.feat_name] for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): node_transformation_start = perf_counter() - if ( feat_conf.feat_type == HUGGINGFACE_TRANFORM and feat_conf.transformation_kwargs["action"] == HUGGINGFACE_TOKENIZE ): + for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) - write_processed_feature( + feature_output_path = os.path.join( + self.output_prefix, + f"node_data/{node_type}-{bert_feat_name}", + ) + feat_meta, feat_size = self._write_processed_feature( bert_feat_name, single_feature_df, - node_type, - transformer, + feature_output_path, ) + node_type_feature_metadata[bert_feat_name] = feat_meta + ntype_feat_sizes.update({bert_feat_name: feat_size}) else: single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( feat_col, feat_name ) - write_processed_feature( + feature_output_path = os.path.join( + self.output_prefix, f"node_data/{node_type}-{feat_name}" + ) + feat_meta, feat_size = self._write_processed_feature( feat_name, single_feature_df, - node_type, - transformer, + feature_output_path, ) + node_type_feature_metadata[feat_name] = feat_meta + ntype_feat_sizes.update({feat_name: feat_size}) + self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = ( + perf_counter() - node_transformation_start + ) # Unpersist and move on to next feature transformed_feature_df.unpersist() @@ -1543,31 +1560,6 @@ def _process_edge_features( feat_conf.feat_name ] = json_representation - def write_feature(self, feat_name, single_feature_df, edge_type, transformer_name): - feature_output_path = os.path.join( - self.output_prefix, f"edge_data/{edge_type.replace(':', '_')}-{feat_name}" - ) - logging.info( - "Writing output for feat_name: '%s' to %s", feat_name, feature_output_path - ) - path_list = self._write_df( - single_feature_df, feature_output_path, out_format=FORMAT_NAME - ) - - edge_feature_metadata_dict = { - "format": {"name": FORMAT_NAME, "delimiter": DELIMITER}, - "data": path_list, - } - edge_feature_metadata_dicts[feat_name] = edge_feature_metadata_dict - - feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None) - efeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val) - etype_feat_sizes.update({feat_name: efeat_size}) - - self.timers[f"{transformer_name}-{edge_type}-{feat_name}"] = ( - perf_counter() - edge_feature_start - ) - # TODO: Remove hack with [feat_conf.feat_name] for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): edge_feature_start = perf_counter() @@ -1578,24 +1570,35 @@ def write_feature(self, feat_name, single_feature_df, edge_type, transformer_nam ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) - write_feature( - self, + feature_output_path = os.path.join( + self.output_prefix, + f"edge_data/{edge_type}-{bert_feat_name}", + ) + feat_meta, feat_size = self._write_processed_feature( bert_feat_name, single_feature_df, - edge_type, - transformer.get_transformation_name(), + feature_output_path, ) + edge_type_feature_metadata[bert_feat_name] = feat_meta + etype_feat_sizes.update({bert_feat_name: feat_size}) else: single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( feat_col, feat_name ) - write_feature( - self, + feature_output_path = os.path.join( + self.output_prefix, f"edge_data/{edge_type}-{feat_name}" + ) + feat_meta, feat_size = self._write_processed_feature( feat_name, single_feature_df, - edge_type, - transformer.get_transformation_name(), + feature_output_path, ) + edge_feature_metadata_dicts[feat_name] = feat_meta + etype_feat_sizes.update({feat_name: feat_size}) + + self.timers[f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = ( + perf_counter() - edge_feature_start + ) # Unpersist and move on to next feature transformed_feature_df.unpersist()