diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index a9753a842b..b667cdb8d2 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -391,6 +391,8 @@ arguments. - ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values, converts the ranks to the -1/1 range, and applies the `inverse of the error function `_ to make the values conform to a Gaussian distribution shape. This transformation only supports a single column as input. + - ``out_dtype`` (Optional): Specify the data type of the transformed feature. + Currently we only support ``float32`` and ``float64`` . - ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator to avoid infinite values during normalization. - ``multi-numerical`` @@ -406,6 +408,8 @@ arguments. - ``separator`` (String, optional): Same as for ``no-op`` transformation, used to separate numerical values in CSV input. If the input data are in Parquet format, each value in the column is assumed to be an array of floats. + - ``out_dtype`` (Optional): Specify the data type of the transformed feature. + Currently we only support ``float32`` and ``float64`` . - ``bucket-numerical`` diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index b09d5a82d8..3fa46e7326 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -102,7 +102,15 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]: if gconstruct_transform_dict["name"] == "max_min_norm": gsp_transformation_dict["name"] = "numerical" - gsp_transformation_dict["kwargs"] = {"normalizer": "min-max", "imputer": "none"} + gsp_transformation_dict["kwargs"] = { + "normalizer": "min-max", + "imputer": "none", + } + + if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]: + gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[ + "out_dtype" + ] elif gconstruct_transform_dict["name"] == "bucket_numerical": gsp_transformation_dict["name"] = "bucket-numerical" assert ( @@ -119,17 +127,19 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]: } elif gconstruct_transform_dict["name"] == "rank_gauss": gsp_transformation_dict["name"] = "numerical" + gsp_transformation_dict["kwargs"] = { + "normalizer": "rank-gauss", + "imputer": "none", + } + if "epsilon" in gconstruct_transform_dict: - gsp_transformation_dict["kwargs"] = { - "epsilon": gconstruct_transform_dict["epsilon"], - "normalizer": "rank-gauss", - "imputer": "none", - } - else: - gsp_transformation_dict["kwargs"] = { - "normalizer": "rank-gauss", - "imputer": "none", - } + gsp_transformation_dict["kwargs"]["epsilon"] = gconstruct_transform_dict[ + "epsilon" + ] + if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]: + gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[ + "out_dtype" + ] elif gconstruct_transform_dict["name"] == "to_categorical": if "separator" in gconstruct_transform_dict: gsp_transformation_dict["name"] = "multi-categorical" diff --git a/graphstorm-processing/graphstorm_processing/config/feature_config_base.py b/graphstorm-processing/graphstorm_processing/config/feature_config_base.py index 20979eda2a..f169d2b718 100644 --- a/graphstorm-processing/graphstorm_processing/config/feature_config_base.py +++ b/graphstorm-processing/graphstorm_processing/config/feature_config_base.py @@ -17,6 +17,8 @@ import abc from typing import Any, Mapping, Sequence +from graphstorm_processing.constants import VALID_OUTDTYPE, TYPE_FLOAT32 + from .data_config_base import DataStorageConfig @@ -96,6 +98,7 @@ def __init__(self, config: Mapping): super().__init__(config) self.value_separator = None + self.out_dtype = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32) if self._transformation_kwargs: self.value_separator = self._transformation_kwargs.get("separator") @@ -105,3 +108,6 @@ def _sanity_check(self) -> None: super()._sanity_check() if self._data_config and self.value_separator and self._data_config.format != "csv": raise RuntimeError("separator should only be provided for CSV data") + assert ( + self.out_dtype in VALID_OUTDTYPE + ), f"Unsupported output dtype, expected one of {VALID_OUTDTYPE}, got {self.out_dtype}" diff --git a/graphstorm-processing/graphstorm_processing/config/numerical_configs.py b/graphstorm-processing/graphstorm_processing/config/numerical_configs.py index acd737f51d..95f1974648 100644 --- a/graphstorm-processing/graphstorm_processing/config/numerical_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/numerical_configs.py @@ -17,7 +17,13 @@ from typing import Mapping import numbers -from graphstorm_processing.constants import VALID_IMPUTERS, VALID_NORMALIZERS +from graphstorm_processing.constants import ( + VALID_IMPUTERS, + VALID_NORMALIZERS, + VALID_OUTDTYPE, + TYPE_FLOAT32, +) + from .feature_config_base import FeatureConfig @@ -42,12 +48,17 @@ class NumericalFeatureConfig(FeatureConfig): and then dividing it by the difference between the maximum value and the minimum. * "standard": Normalize each value by dividing it by the sum of all the values. * "rank-gauss": Normalize each value by rank gauss normalization. + + out_dtype: str + Output feature dtype. Currently, we support ``float32`` and ``float64``. + Default is ``float32`` """ def __init__(self, config: Mapping): super().__init__(config) self.imputer = self._transformation_kwargs.get("imputer", "none") self.norm = self._transformation_kwargs.get("normalizer", "none") + self.out_dtype = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32) self._sanity_check() @@ -59,6 +70,9 @@ def _sanity_check(self) -> None: assert ( self.norm in VALID_NORMALIZERS ), f"Unknown normalizer requested, expected one of {VALID_NORMALIZERS}, got {self.norm}" + assert ( + self.out_dtype in VALID_OUTDTYPE + ), f"Unsupported output dtype, expected one of {VALID_OUTDTYPE}, got {self.out_dtype}" class MultiNumericalFeatureConfig(NumericalFeatureConfig): diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index f849c9c29f..db57c6260c 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -14,6 +14,8 @@ limitations under the License. """ +from pyspark.sql.types import FloatType, DoubleType + ################### Categorical Limits ####################### MAX_CATEGORIES_PER_FEATURE = 100 RARE_CATEGORY = "GSP_CONSTANT_OTHER" @@ -44,6 +46,10 @@ ################# Numerical transformations ################ VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"] VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"] +TYPE_FLOAT32 = "float32" +TYPE_FLOAT64 = "float64" +VALID_OUTDTYPE = [TYPE_FLOAT32, TYPE_FLOAT64] +DTYPE_MAP = {TYPE_FLOAT32: FloatType(), TYPE_FLOAT64: DoubleType()} ################# Bert transformations ################ HUGGINGFACE_TRANFORM = "huggingface" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_noop_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_noop_transformation.py index bd785eb212..563f75a5bd 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_noop_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_noop_transformation.py @@ -18,9 +18,10 @@ from pyspark.sql import DataFrame from pyspark.sql import functions as F -from pyspark.sql.types import ArrayType, DoubleType, NumericType +from pyspark.sql.types import ArrayType, NumericType + +from graphstorm_processing.constants import SPECIAL_CHARACTERS, DTYPE_MAP, TYPE_FLOAT32 -from graphstorm_processing.constants import SPECIAL_CHARACTERS from .base_dist_transformation import DistributedTransformation @@ -38,12 +39,17 @@ class NoopTransformation(DistributedTransformation): The list of columns to parse as floats or lists of float separator : Optional[str], optional Optional separator to use to split the string, by default None + out_dtype: str + The output feature dtype """ - def __init__(self, cols: List[str], separator: Optional[str] = None) -> None: + def __init__( + self, cols: List[str], out_dtype: str = TYPE_FLOAT32, separator: Optional[str] = None + ) -> None: super().__init__(cols) # TODO: Support multiple cols? + self.out_dtype = out_dtype self.separator = separator # Spark's split function uses a regexp so we need to # escape special chars to be used as separators @@ -78,7 +84,7 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo return None strvec_to_float_vec_udf = F.udf( - str_list_to_float_vec, ArrayType(DoubleType(), containsNull=False) + str_list_to_float_vec, ArrayType(DTYPE_MAP[self.out_dtype], containsNull=False) ) if self.separator: @@ -92,7 +98,10 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo return input_df else: return input_df.select( - [F.col(column).cast(DoubleType()).alias(column) for column in self.cols] + [ + F.col(column).cast(DTYPE_MAP[self.out_dtype]).alias(column) + for column in self.cols + ] ) @staticmethod diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py index 5b51a702bd..80d4c2a7ba 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py @@ -20,7 +20,7 @@ from pyspark.sql import DataFrame from pyspark.sql import functions as F -from pyspark.sql.types import ArrayType, FloatType +from pyspark.sql.types import ArrayType from pyspark.ml.feature import MinMaxScaler, Imputer, VectorAssembler, ElementwiseProduct from pyspark.ml.linalg import DenseVector from pyspark.ml.stat import Summarizer @@ -33,7 +33,13 @@ # pylint: disable = no-name-in-module from scipy.special import erfinv -from graphstorm_processing.constants import SPECIAL_CHARACTERS, VALID_IMPUTERS, VALID_NORMALIZERS +from graphstorm_processing.constants import ( + SPECIAL_CHARACTERS, + VALID_IMPUTERS, + VALID_NORMALIZERS, + DTYPE_MAP, + TYPE_FLOAT32, +) from .base_dist_transformation import DistributedTransformation from ..spark_utils import rename_multiple_cols @@ -81,7 +87,11 @@ def apply_imputation(cols: Sequence[str], shared_imputation: str, input_df: Data def apply_norm( - cols: Sequence[str], shared_norm: str, imputed_df: DataFrame, epsilon: float = 1e-6 + cols: Sequence[str], + shared_norm: str, + imputed_df: DataFrame, + out_dtype: str = TYPE_FLOAT32, + epsilon: float = 1e-6, ) -> DataFrame: """Applies a single normalizer to the imputed dataframe, individually to each of the columns provided in the cols argument. @@ -96,6 +106,8 @@ def apply_norm( imputed_df : DataFrame The input DataFrame to apply normalization to. It should not contain missing values. + out_dtype: str + The output feature dtype. epsilon: float Epsilon for normalization used to avoid INF float during computation on "rank-gauss". @@ -109,13 +121,20 @@ def apply_norm( ------ RuntimeError If missing values exist in the data when the "standard" normalizer is used. + + ValueError + If unsupported feature output dtype is provided. """ other_cols = list(set(imputed_df.columns).difference(cols)) def single_vec_to_float(vec): return float(vec[0]) - vec_udf = F.udf(single_vec_to_float, FloatType()) + # Use the map to get the corresponding data type object, or raise an error if not found + if out_dtype in DTYPE_MAP: + vec_udf = F.udf(single_vec_to_float, DTYPE_MAP[out_dtype]) + else: + raise ValueError("Unsupported feature output dtype") assert shared_norm in VALID_NORMALIZERS, ( f"Unsupported normalization requested: {shared_norm}, the supported " @@ -123,6 +142,8 @@ def single_vec_to_float(vec): ) if shared_norm == "none": + # Save the time and efficiency for not casting the type + # when not doing any normalization scaled_df = imputed_df elif shared_norm == "min-max": # Because the scalers expect Vector input, we need to use VectorAssembler on each, @@ -155,7 +176,8 @@ def single_vec_to_float(vec): "normalization. Use an imputer in the transformation." ) scaled_df = imputed_df.select( - [(F.col(c) / col_sums[f"sum({c})"]).alias(c) for c in cols] + other_cols + [(F.col(c) / col_sums[f"sum({c})"]).cast(DTYPE_MAP[out_dtype]).alias(c) for c in cols] + + other_cols ) elif shared_norm == "rank-gauss": assert len(cols) == 1, "Rank-Guass numerical transformation only supports single column" @@ -182,7 +204,7 @@ def gauss_transform(rank: pd.Series) -> pd.Series: return pd.Series(erfinv(clipped_rank)) num_rows = value_rank_df.count() - gauss_udf = F.pandas_udf(gauss_transform, FloatType()) + gauss_udf = F.pandas_udf(gauss_transform, DTYPE_MAP[out_dtype]) normalized_df = value_rank_df.withColumn(column_name, gauss_udf(value_rank_col)) scaled_df = normalized_df.orderBy(original_order_col).drop( value_rank_col, original_order_col @@ -205,17 +227,25 @@ class DistNumericalTransformation(DistributedTransformation): imputer : str The type of missing value imputation to apply to the column. Valid values are "mean", "median" and "most_frequent". + out_dtype: str + Output feature dtype epsilon: float Epsilon for normalization used to avoid INF float during computation. """ def __init__( - self, cols: Sequence[str], normalizer: str, imputer: str, epsilon: float = 1e-6 + self, + cols: Sequence[str], + normalizer: str, + imputer: str, + out_dtype: str = TYPE_FLOAT32, + epsilon: float = 1e-6, ) -> None: super().__init__(cols) self.cols = cols self.shared_norm = normalizer self.epsilon = epsilon + self.out_dtype = out_dtype # Spark uses 'mode' for the most frequent element self.shared_imputation = "mode" if imputer == "most_frequent" else imputer @@ -225,7 +255,9 @@ def apply(self, input_df: DataFrame) -> DataFrame: ) imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df) - scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df, self.epsilon) + scaled_df = apply_norm( + self.cols, self.shared_norm, imputed_df, self.out_dtype, self.epsilon + ) # TODO: Figure out why the transformation is producing Double values, and switch to float return scaled_df @@ -251,10 +283,17 @@ class DistMultiNumericalTransformation(DistNumericalTransformation): imputer : str The type of missing value imputation to apply to the column. Valid values are "mean", "median" and "most_frequent". + out_dtype: str + Output feature dtype """ def __init__( - self, cols: Sequence[str], separator: Optional[str], normalizer: str, imputer: str + self, + cols: Sequence[str], + separator: Optional[str], + normalizer: str, + imputer: str, + out_dtype: str = TYPE_FLOAT32, ) -> None: assert ( len(cols) == 1 @@ -269,6 +308,7 @@ def __init__( # special chars to be used as separators if self.separator in SPECIAL_CHARACTERS: self.separator = f"\\{self.separator}" + self.out_dtype = out_dtype @staticmethod def get_transformation_name() -> str: @@ -343,7 +383,7 @@ def convert_multistring_to_sequence_df( ), # Split along the separator replace_empty_with_nan, ) - .cast(ArrayType(FloatType(), True)) + .cast(ArrayType(DTYPE_MAP[self.out_dtype], True)) .alias(self.multi_column) ) @@ -405,7 +445,7 @@ def vector_df_has_nan(vector_df: DataFrame, vector_col: str) -> bool: else: split_array_df = input_df.select( F.col(self.multi_column) - .cast(ArrayType(FloatType(), True)) + .cast(ArrayType(DTYPE_MAP[self.out_dtype], True)) .alias(self.multi_column) ) diff --git a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json index d9458ad317..7967f5089b 100644 --- a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json +++ b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json @@ -41,7 +41,8 @@ "name": "numerical", "kwargs": { "normalizer": "none", - "imputer": "mean" + "imputer": "mean", + "out_dtype": "float64" } } }, diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index d065f0ecf8..54078e4700 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -53,7 +53,7 @@ def test_try_read_file_with_wildcard( def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node_dict: dict): - """We currently only support no-op and numerical features, so should error out otherwise.""" + """We should test about giving unknown feature transformation type.""" node_dict["nodes"][0]["features"] = [ { "feature_col": ["paper_title"], @@ -65,6 +65,60 @@ def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node _ = converter.convert_nodes(node_dict["nodes"]) +@pytest.mark.parametrize("transform", ["max_min_norm", "rank_gauss"]) +@pytest.mark.parametrize("out_dtype", ["float16", "float32", "float64"]) +def test_try_convert_out_dtype( + converter: GConstructConfigConverter, node_dict: dict, transform: str, out_dtype: str +): + node_dict["nodes"][0]["features"] = [ + { + "feature_col": ["paper_title"], + "transform": {"name": transform, "out_dtype": out_dtype}, + } + ] + + normalizer_dict = {"max_min_norm": "min-max", "rank_gauss": "rank-gauss"} + res = converter.convert_nodes(node_dict["nodes"])[0] + if out_dtype == "float32": + assert res.features == [ + { + "column": "paper_title", + "transformation": { + "kwargs": { + "imputer": "none", + "normalizer": normalizer_dict[transform], + "out_dtype": "float32", + }, + "name": "numerical", + }, + } + ] + elif out_dtype == "float64": + assert res.features == [ + { + "column": "paper_title", + "transformation": { + "kwargs": { + "imputer": "none", + "normalizer": normalizer_dict[transform], + "out_dtype": "float64", + }, + "name": "numerical", + }, + } + ] + elif out_dtype == "float16": + assert res.features == [ + { + "column": "paper_title", + "transformation": { + "kwargs": {"imputer": "none", "normalizer": normalizer_dict[transform]}, + "name": "numerical", + }, + } + ] + + @pytest.mark.parametrize("col_name", ["citation_time", ["citation_time"]]) def test_read_node_gconstruct(converter: GConstructConfigConverter, node_dict: dict, col_name: str): """Multiple test cases for GConstruct node conversion""" diff --git a/graphstorm-processing/tests/test_dist_numerical_transformation.py b/graphstorm-processing/tests/test_dist_numerical_transformation.py index 0e8ed61306..24f1030eda 100644 --- a/graphstorm-processing/tests/test_dist_numerical_transformation.py +++ b/graphstorm-processing/tests/test_dist_numerical_transformation.py @@ -21,7 +21,15 @@ import numpy as np from numpy.testing import assert_array_equal, assert_array_almost_equal, assert_almost_equal from pyspark.sql import SparkSession, DataFrame, functions as F -from pyspark.sql.types import ArrayType, FloatType, StructField, StructType, StringType +from pyspark.sql.types import ( + ArrayType, + FloatType, + DoubleType, + StructField, + StructType, + StringType, + LongType, +) from scipy.special import erfinv from graphstorm_processing.data_transformations.dist_transformations import ( @@ -114,6 +122,37 @@ def test_numerical_transformation_without_transformation(input_df: DataFrame, ch assert row["salary"] == expected_salary +@pytest.mark.parametrize("norm", ["min-max", "standard", "rank-gauss"]) +@pytest.mark.parametrize("out_dtype", ["float32", "float64"]) +def test_numerical_min_max_transformation_precision( + spark: SparkSession, check_df_schema, out_dtype, norm +): + """Test numerical transformation without any transformation applied""" + # Adjust the number to be an integer + high_precision_integer = 1.2345678901234562 + data = [(high_precision_integer,)] + schema = StructType([StructField("age", FloatType(), True)]) + input_df = spark.createDataFrame(data, schema=schema) + + dist_numerical_transformation = DistNumericalTransformation( + ["age"], imputer="none", normalizer="min-max", out_dtype=out_dtype + ) + + transformed_df = dist_numerical_transformation.apply(input_df) + check_df_schema(transformed_df) + column_data_type = [ + field.dataType for field in transformed_df.schema.fields if field.name == "age" + ][0] + if out_dtype == "float32": + assert isinstance( + column_data_type, FloatType + ), f"The column 'age' is not of type FloatType." + elif out_dtype == "float64": + assert isinstance( + column_data_type, DoubleType + ), f"The column 'age' is not of type DoubleType." + + def test_numerical_transformation_with_median_imputer_and_std_norm( input_df: DataFrame, check_df_schema ): @@ -132,7 +171,7 @@ def test_numerical_transformation_with_median_imputer_and_std_norm( expected_imputed_std_ages = [0.2, 0.2, 0.1, 0.3, 0.2] for row, expected_val in zip(transformed_rows, expected_imputed_std_ages): - assert row["age"] == expected_val + assert_almost_equal(row["age"], expected_val, decimal=7) def test_multi_numerical_transformation_without_norm_and_imputer(input_df: DataFrame):