diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 1abecb341d..4158a69a95 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -385,8 +385,13 @@ arguments. imputation. Can take the following values: - ``none``: (Default) Don't normalize the numerical values during encoding. - ``min-max``: Normalize each value by subtracting the minimum value from it, - and then dividing it by the difference between the maximum value and the minimum. - - ``standard``: Normalize each value by dividing it by the sum of all the values. + and then dividing it by the difference between the maximum value and the minimum. + - ``standard``: Normalize each value by dividing it by the sum of all the values. + - ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values, + converts the ranks to the -1/1 range, and applies the `inverse of the error function `_ to make the values conform + to a Gaussian distribution shape. This transformation only supports a single column as input. + - ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator + to avoid infinite values during normalization. - ``multi-numerical`` - Column-wise transformation for vector-like numerical data using a missing data imputer and an diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index cb10830d4a..89bfd568b9 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -112,6 +112,19 @@ def _convert_feature(feats: list[dict]) -> list[dict]: "slide_window_size": gconstruct_transform_dict["slide_window_size"], "imputer": "none", } + elif gconstruct_transform_dict["name"] == "rank_gauss": + gsp_transformation_dict["name"] = "numerical" + if "epsilon" in gconstruct_transform_dict: + gsp_transformation_dict["kwargs"] = { + "epsilon": gconstruct_transform_dict["epsilon"], + "normalizer": "rank-gauss", + "imputer": "none", + } + else: + gsp_transformation_dict["kwargs"] = { + "normalizer": "rank-gauss", + "imputer": "none", + } # TODO: Add support for other common transformations here else: raise ValueError( diff --git a/graphstorm-processing/graphstorm_processing/config/numerical_configs.py b/graphstorm-processing/graphstorm_processing/config/numerical_configs.py index 6544d5f14e..585862b85b 100644 --- a/graphstorm-processing/graphstorm_processing/config/numerical_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/numerical_configs.py @@ -32,7 +32,7 @@ class NumericalFeatureConfig(FeatureConfig): normalizer: str A normalization to apply to each column. Valid values are - "none", "min-max", and "standard". + "none", "min-max", "standard", and "rank-gauss" The transformation applied will be: @@ -40,6 +40,7 @@ class NumericalFeatureConfig(FeatureConfig): * "min-max": Normalize each value by subtracting the minimum value from it, and then dividing it by the difference between the maximum value and the minimum. * "standard": Normalize each value by dividing it by the sum of all the values. + * "rank-gauss": Normalize each value by rank gauss normalization. """ def __init__(self, config: Mapping): diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index c2f6c601e6..6ac05eb2ef 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -42,4 +42,4 @@ ################# Numerical transformations ################ VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"] -VALID_NORMALIZERS = ["none", "min-max", "standard"] +VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"] diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py index f68fc98208..7387a81a41 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_numerical_transformation.py @@ -15,6 +15,7 @@ """ import logging from typing import Optional, Sequence +import uuid from pyspark.sql import DataFrame from pyspark.sql import functions as F @@ -26,6 +27,10 @@ from pyspark.ml.functions import array_to_vector, vector_to_array import numpy as np +import pandas as pd + +# pylint: disable = no-name-in-module +from scipy.special import erfinv from graphstorm_processing.constants import SPECIAL_CHARACTERS, VALID_IMPUTERS, VALID_NORMALIZERS from .base_dist_transformation import DistributedTransformation @@ -74,7 +79,9 @@ def apply_imputation(cols: Sequence[str], shared_imputation: str, input_df: Data return imputed_df -def apply_norm(cols: Sequence[str], shared_norm: str, imputed_df: DataFrame) -> DataFrame: +def apply_norm( + cols: Sequence[str], shared_norm: str, imputed_df: DataFrame, epsilon: float = 1e-6 +) -> DataFrame: """Applies a single normalizer to the imputed dataframe, individually to each of the columns provided in the cols argument. @@ -84,10 +91,13 @@ def apply_norm(cols: Sequence[str], shared_norm: str, imputed_df: DataFrame) -> List of column names to apply normalization to. shared_norm : str The type of normalization to use. Valid values are "none", "min-max", - "standard". + "standard", "rank-gauss". imputed_df : DataFrame The input DataFrame to apply normalization to. It should not contain missing values. + epsilon: float + Epsilon for normalization used to avoid INF float during computation + on "rank-gauss". Returns ------- @@ -146,6 +156,36 @@ def single_vec_to_float(vec): scaled_df = imputed_df.select( [(F.col(c) / col_sums[f"sum({c})"]).alias(c) for c in cols] + other_cols ) + elif shared_norm == "rank-gauss": + assert len(cols) == 1, "Rank-Guass numerical transformation only supports single column" + column_name = cols[0] + select_df = imputed_df.select(column_name) + # original id is the original order for the input data frame, + # value rank indicates the rank of each value in the column + # We need original id to help us restore the order. + original_order_col = f"original-order-{uuid.uuid4().hex[8]}" + value_rank_col = f"value-rank-{uuid.uuid4().hex[8]}" + df_with_order_idx = select_df.withColumn( + original_order_col, F.monotonically_increasing_id() + ) + value_sorted_df = df_with_order_idx.orderBy(column_name) + value_rank_df = value_sorted_df.withColumn(value_rank_col, F.monotonically_increasing_id()) + + # pylint: disable = cell-var-from-loop + # It is required to put num_rows definition outside, + # or pandas.udf will throw an error + def gauss_transform(rank: pd.Series) -> pd.Series: + feat_range = num_rows - 1 + clipped_rank = (rank / feat_range - 0.5) * 2 + clipped_rank = np.maximum(np.minimum(clipped_rank, 1 - epsilon), epsilon - 1) + return pd.Series(erfinv(clipped_rank)) + + num_rows = value_rank_df.count() + gauss_udf = F.pandas_udf(gauss_transform, FloatType()) + normalized_df = value_rank_df.withColumn(column_name, gauss_udf(value_rank_col)) + scaled_df = normalized_df.orderBy(original_order_col).drop( + value_rank_col, original_order_col + ) return scaled_df @@ -160,16 +200,21 @@ class DistNumericalTransformation(DistributedTransformation): The list of columns to apply the transformations on. normalizer : str The normalization to apply to the columns. - Valid values are "none", "min-max", and "standard". + Valid values are "none", "min-max", "standard", "rank-gauss". imputer : str The type of missing value imputation to apply to the column. Valid values are "mean", "median" and "most_frequent". + epsilon: float + Epsilon for normalization used to avoid INF float during computation. """ - def __init__(self, cols: Sequence[str], normalizer: str, imputer: str) -> None: + def __init__( + self, cols: Sequence[str], normalizer: str, imputer: str, epsilon: float = 1e-6 + ) -> None: super().__init__(cols) self.cols = cols self.shared_norm = normalizer + self.epsilon = epsilon # Spark uses 'mode' for the most frequent element self.shared_imputation = "mode" if imputer == "most_frequent" else imputer @@ -179,7 +224,7 @@ def apply(self, input_df: DataFrame) -> DataFrame: ) imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df) - scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df) + scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df, self.epsilon) # TODO: Figure out why the transformation is producing Double values, and switch to float return scaled_df diff --git a/graphstorm-processing/pyproject.toml b/graphstorm-processing/pyproject.toml index 95bbc0d61a..7bb87f2752 100644 --- a/graphstorm-processing/pyproject.toml +++ b/graphstorm-processing/pyproject.toml @@ -18,6 +18,7 @@ joblib = "^1.3.1" pandas = "^1.3.5" psutil = "^5.9.5" sagemaker = "^2.83.0" +scipy = "^1.10.1" [tool.poetry.group.dev] optional = true diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index 59dbc1b662..5bad285451 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -217,6 +217,16 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "slide_window_size": 5, }, }, + { + "feature_col": ["num_citations"], + "feature_name": "rank_gauss1", + "transform": {"name": "rank_gauss"}, + }, + { + "feature_col": ["num_citations"], + "feature_name": "rank_gauss2", + "transform": {"name": "rank_gauss", "epsilon": 0.1}, + }, ], "labels": [ {"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]} @@ -273,6 +283,22 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): }, }, }, + { + "column": "num_citations", + "name": "rank_gauss1", + "transformation": { + "name": "numerical", + "kwargs": {"normalizer": "rank-gauss", "imputer": "none"}, + }, + }, + { + "column": "num_citations", + "name": "rank_gauss2", + "transformation": { + "name": "numerical", + "kwargs": {"epsilon": 0.1, "normalizer": "rank-gauss", "imputer": "none"}, + }, + }, ] assert nodes_output["labels"] == [ { diff --git a/graphstorm-processing/tests/test_dist_numerical_transformation.py b/graphstorm-processing/tests/test_dist_numerical_transformation.py index 6fee80f6c0..816a0420c5 100644 --- a/graphstorm-processing/tests/test_dist_numerical_transformation.py +++ b/graphstorm-processing/tests/test_dist_numerical_transformation.py @@ -17,9 +17,11 @@ import pytest import pandas as pd -from numpy.testing import assert_array_equal, assert_array_almost_equal +import numpy as np +from numpy.testing import assert_array_equal, assert_array_almost_equal, assert_almost_equal from pyspark.sql import SparkSession, DataFrame, functions as F from pyspark.sql.types import ArrayType, FloatType, StructField, StructType, StringType +from scipy.special import erfinv from graphstorm_processing.data_transformations.dist_transformations import ( DistNumericalTransformation, @@ -297,3 +299,66 @@ def test_multi_numerical_transformation_with_array_input(spark: SparkSession, ch transformed_rows = transformed_df.collect() for row, expected_vector in zip(transformed_rows, expected_vals): assert_array_almost_equal(row["feat"], expected_vector, decimal=3) + + +def rank_gauss(feat, eps): + lower = -1 + eps + upper = 1 - eps + range = upper - lower + i = np.argsort(feat, axis=0) + j = np.argsort(i, axis=0) + j_range = len(j) - 1 + divider = j_range / range + feat = j / divider + feat = feat - upper + return erfinv(feat) + + +@pytest.mark.parametrize("epsilon", [0.0, 1e-6]) +def test_rank_gauss(spark: SparkSession, check_df_schema, epsilon): + data = [(0.0,), (15.0,), (26.0,), (40.0,)] + + input_df = spark.createDataFrame(data, schema=["age"]) + rg_transformation = DistNumericalTransformation( + ["age"], imputer="none", normalizer="rank-gauss", epsilon=epsilon + ) + + output_df = rg_transformation.apply(input_df) + check_df_schema(output_df) + + out_rows = output_df.collect() + + expected_vals = rank_gauss(np.array([[0.0], [15.0], [26.0], [40.0]]), epsilon) + for i, row in enumerate(out_rows): + assert_almost_equal( + [row["age"]], expected_vals[i, :], decimal=4, err_msg=f"Row {i} is not equal" + ) + + +@pytest.mark.parametrize("epsilon", [0.0, 1e-6]) +def test_rank_gauss_reshuffling(spark: SparkSession, check_df_schema, epsilon): + # Create DF with 10k values + random_values = np.random.rand(10**3, 1) + + # Convert the array of values into a list of single-value lists + data = [(float(value),) for value in random_values] + input_df = spark.createDataFrame(data, schema=["rand"]) + # repartition dataset + input_df = input_df.repartition(4) + # collect partitioned data pre-transform + part_rows = [[row["rand"]] for row in input_df.collect()] + + rg_transformation = DistNumericalTransformation( + ["rand"], imputer="none", normalizer="rank-gauss", epsilon=epsilon + ) + + output_df = rg_transformation.apply(input_df) + check_df_schema(output_df) + + out_rows = output_df.collect() + + expected_vals = rank_gauss(np.array(part_rows), epsilon) + for i, row in enumerate(out_rows): + assert_almost_equal( + [row["rand"]], expected_vals[i, :], decimal=4, err_msg=f"Row {i} is not equal" + )