Skip to content

Commit

Permalink
[GSProcessing] Add support for Rank-Gauss Feature Transformation (#615)
Browse files Browse the repository at this point in the history
*Issue #, if available:*

*Description of changes:*


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: Theodore Vasiloudis <[email protected]>
  • Loading branch information
jalencato and thvasilo authored Nov 7, 2023
1 parent 4646462 commit 21eda1f
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 10 deletions.
9 changes: 7 additions & 2 deletions docs/source/gs-processing/developer/input-configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,13 @@ arguments.
imputation. Can take the following values:
- ``none``: (Default) Don't normalize the numerical values during encoding.
- ``min-max``: Normalize each value by subtracting the minimum value from it,
and then dividing it by the difference between the maximum value and the minimum.
- ``standard``: Normalize each value by dividing it by the sum of all the values.
and then dividing it by the difference between the maximum value and the minimum.
- ``standard``: Normalize each value by dividing it by the sum of all the values.
- ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values,
converts the ranks to the -1/1 range, and applies the `inverse of the error function <https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.erfinv.html>`_ to make the values conform
to a Gaussian distribution shape. This transformation only supports a single column as input.
- ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator
to avoid infinite values during normalization.
- ``multi-numerical``

- Column-wise transformation for vector-like numerical data using a missing data imputer and an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ def _convert_feature(feats: list[dict]) -> list[dict]:
"slide_window_size": gconstruct_transform_dict["slide_window_size"],
"imputer": "none",
}
elif gconstruct_transform_dict["name"] == "rank_gauss":
gsp_transformation_dict["name"] = "numerical"
if "epsilon" in gconstruct_transform_dict:
gsp_transformation_dict["kwargs"] = {
"epsilon": gconstruct_transform_dict["epsilon"],
"normalizer": "rank-gauss",
"imputer": "none",
}
else:
gsp_transformation_dict["kwargs"] = {
"normalizer": "rank-gauss",
"imputer": "none",
}
# TODO: Add support for other common transformations here
else:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ class NumericalFeatureConfig(FeatureConfig):
normalizer: str
A normalization to apply to each column. Valid values are
"none", "min-max", and "standard".
"none", "min-max", "standard", and "rank-gauss"
The transformation applied will be:
* "none": (Default) Don't normalize the numerical values during encoding.
* "min-max": Normalize each value by subtracting the minimum value from it,
and then dividing it by the difference between the maximum value and the minimum.
* "standard": Normalize each value by dividing it by the sum of all the values.
* "rank-gauss": Normalize each value by rank gauss normalization.
"""

def __init__(self, config: Mapping):
Expand Down
2 changes: 1 addition & 1 deletion graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@

################# Numerical transformations ################
VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"]
VALID_NORMALIZERS = ["none", "min-max", "standard"]
VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"]
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""
import logging
from typing import Optional, Sequence
import uuid

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
Expand All @@ -26,6 +27,10 @@
from pyspark.ml.functions import array_to_vector, vector_to_array

import numpy as np
import pandas as pd

# pylint: disable = no-name-in-module
from scipy.special import erfinv

from graphstorm_processing.constants import SPECIAL_CHARACTERS, VALID_IMPUTERS, VALID_NORMALIZERS
from .base_dist_transformation import DistributedTransformation
Expand Down Expand Up @@ -74,7 +79,9 @@ def apply_imputation(cols: Sequence[str], shared_imputation: str, input_df: Data
return imputed_df


def apply_norm(cols: Sequence[str], shared_norm: str, imputed_df: DataFrame) -> DataFrame:
def apply_norm(
cols: Sequence[str], shared_norm: str, imputed_df: DataFrame, epsilon: float = 1e-6
) -> DataFrame:
"""Applies a single normalizer to the imputed dataframe, individually to each of the columns
provided in the cols argument.
Expand All @@ -84,10 +91,13 @@ def apply_norm(cols: Sequence[str], shared_norm: str, imputed_df: DataFrame) ->
List of column names to apply normalization to.
shared_norm : str
The type of normalization to use. Valid values are "none", "min-max",
"standard".
"standard", "rank-gauss".
imputed_df : DataFrame
The input DataFrame to apply normalization to. It should not contain
missing values.
epsilon: float
Epsilon for normalization used to avoid INF float during computation
on "rank-gauss".
Returns
-------
Expand Down Expand Up @@ -146,6 +156,36 @@ def single_vec_to_float(vec):
scaled_df = imputed_df.select(
[(F.col(c) / col_sums[f"sum({c})"]).alias(c) for c in cols] + other_cols
)
elif shared_norm == "rank-gauss":
assert len(cols) == 1, "Rank-Guass numerical transformation only supports single column"
column_name = cols[0]
select_df = imputed_df.select(column_name)
# original id is the original order for the input data frame,
# value rank indicates the rank of each value in the column
# We need original id to help us restore the order.
original_order_col = f"original-order-{uuid.uuid4().hex[8]}"
value_rank_col = f"value-rank-{uuid.uuid4().hex[8]}"
df_with_order_idx = select_df.withColumn(
original_order_col, F.monotonically_increasing_id()
)
value_sorted_df = df_with_order_idx.orderBy(column_name)
value_rank_df = value_sorted_df.withColumn(value_rank_col, F.monotonically_increasing_id())

# pylint: disable = cell-var-from-loop
# It is required to put num_rows definition outside,
# or pandas.udf will throw an error
def gauss_transform(rank: pd.Series) -> pd.Series:
feat_range = num_rows - 1
clipped_rank = (rank / feat_range - 0.5) * 2
clipped_rank = np.maximum(np.minimum(clipped_rank, 1 - epsilon), epsilon - 1)
return pd.Series(erfinv(clipped_rank))

num_rows = value_rank_df.count()
gauss_udf = F.pandas_udf(gauss_transform, FloatType())
normalized_df = value_rank_df.withColumn(column_name, gauss_udf(value_rank_col))
scaled_df = normalized_df.orderBy(original_order_col).drop(
value_rank_col, original_order_col
)

return scaled_df

Expand All @@ -160,16 +200,21 @@ class DistNumericalTransformation(DistributedTransformation):
The list of columns to apply the transformations on.
normalizer : str
The normalization to apply to the columns.
Valid values are "none", "min-max", and "standard".
Valid values are "none", "min-max", "standard", "rank-gauss".
imputer : str
The type of missing value imputation to apply to the column.
Valid values are "mean", "median" and "most_frequent".
epsilon: float
Epsilon for normalization used to avoid INF float during computation.
"""

def __init__(self, cols: Sequence[str], normalizer: str, imputer: str) -> None:
def __init__(
self, cols: Sequence[str], normalizer: str, imputer: str, epsilon: float = 1e-6
) -> None:
super().__init__(cols)
self.cols = cols
self.shared_norm = normalizer
self.epsilon = epsilon
# Spark uses 'mode' for the most frequent element
self.shared_imputation = "mode" if imputer == "most_frequent" else imputer

Expand All @@ -179,7 +224,7 @@ def apply(self, input_df: DataFrame) -> DataFrame:
)

imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df)
scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df)
scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df, self.epsilon)

# TODO: Figure out why the transformation is producing Double values, and switch to float
return scaled_df
Expand Down
1 change: 1 addition & 0 deletions graphstorm-processing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ joblib = "^1.3.1"
pandas = "^1.3.5"
psutil = "^5.9.5"
sagemaker = "^2.83.0"
scipy = "^1.10.1"

[tool.poetry.group.dev]
optional = true
Expand Down
26 changes: 26 additions & 0 deletions graphstorm-processing/tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
"slide_window_size": 5,
},
},
{
"feature_col": ["num_citations"],
"feature_name": "rank_gauss1",
"transform": {"name": "rank_gauss"},
},
{
"feature_col": ["num_citations"],
"feature_name": "rank_gauss2",
"transform": {"name": "rank_gauss", "epsilon": 0.1},
},
],
"labels": [
{"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]}
Expand Down Expand Up @@ -273,6 +283,22 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
},
},
},
{
"column": "num_citations",
"name": "rank_gauss1",
"transformation": {
"name": "numerical",
"kwargs": {"normalizer": "rank-gauss", "imputer": "none"},
},
},
{
"column": "num_citations",
"name": "rank_gauss2",
"transformation": {
"name": "numerical",
"kwargs": {"epsilon": 0.1, "normalizer": "rank-gauss", "imputer": "none"},
},
},
]
assert nodes_output["labels"] == [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import pytest
import pandas as pd
from numpy.testing import assert_array_equal, assert_array_almost_equal
import numpy as np
from numpy.testing import assert_array_equal, assert_array_almost_equal, assert_almost_equal
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.sql.types import ArrayType, FloatType, StructField, StructType, StringType
from scipy.special import erfinv

from graphstorm_processing.data_transformations.dist_transformations import (
DistNumericalTransformation,
Expand Down Expand Up @@ -297,3 +299,66 @@ def test_multi_numerical_transformation_with_array_input(spark: SparkSession, ch
transformed_rows = transformed_df.collect()
for row, expected_vector in zip(transformed_rows, expected_vals):
assert_array_almost_equal(row["feat"], expected_vector, decimal=3)


def rank_gauss(feat, eps):
lower = -1 + eps
upper = 1 - eps
range = upper - lower
i = np.argsort(feat, axis=0)
j = np.argsort(i, axis=0)
j_range = len(j) - 1
divider = j_range / range
feat = j / divider
feat = feat - upper
return erfinv(feat)


@pytest.mark.parametrize("epsilon", [0.0, 1e-6])
def test_rank_gauss(spark: SparkSession, check_df_schema, epsilon):
data = [(0.0,), (15.0,), (26.0,), (40.0,)]

input_df = spark.createDataFrame(data, schema=["age"])
rg_transformation = DistNumericalTransformation(
["age"], imputer="none", normalizer="rank-gauss", epsilon=epsilon
)

output_df = rg_transformation.apply(input_df)
check_df_schema(output_df)

out_rows = output_df.collect()

expected_vals = rank_gauss(np.array([[0.0], [15.0], [26.0], [40.0]]), epsilon)
for i, row in enumerate(out_rows):
assert_almost_equal(
[row["age"]], expected_vals[i, :], decimal=4, err_msg=f"Row {i} is not equal"
)


@pytest.mark.parametrize("epsilon", [0.0, 1e-6])
def test_rank_gauss_reshuffling(spark: SparkSession, check_df_schema, epsilon):
# Create DF with 10k values
random_values = np.random.rand(10**3, 1)

# Convert the array of values into a list of single-value lists
data = [(float(value),) for value in random_values]
input_df = spark.createDataFrame(data, schema=["rand"])
# repartition dataset
input_df = input_df.repartition(4)
# collect partitioned data pre-transform
part_rows = [[row["rand"]] for row in input_df.collect()]

rg_transformation = DistNumericalTransformation(
["rand"], imputer="none", normalizer="rank-gauss", epsilon=epsilon
)

output_df = rg_transformation.apply(input_df)
check_df_schema(output_df)

out_rows = output_df.collect()

expected_vals = rank_gauss(np.array(part_rows), epsilon)
for i, row in enumerate(out_rows):
assert_almost_equal(
[row["rand"]], expected_vals[i, :], decimal=4, err_msg=f"Row {i} is not equal"
)

0 comments on commit 21eda1f

Please sign in to comment.