From 618728380e59029420491ad3a1523cde02b8fca4 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 2 Nov 2023 22:17:39 +0000 Subject: [PATCH] add first unit test --- ...dist_rankgauss_numerical_transformation.py | 12 ++-- .../test_dist_rankgauss_transformation.py | 56 +++++++++++++++++++ 2 files changed, 62 insertions(+), 6 deletions(-) create mode 100644 graphstorm-processing/tests/test_dist_rankgauss_transformation.py diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_rankgauss_numerical_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_rankgauss_numerical_transformation.py index 6f71a8464b..08f11bf030 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_rankgauss_numerical_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_rankgauss_numerical_transformation.py @@ -49,7 +49,7 @@ def __init__( ) -> None: super().__init__(cols) self.cols = cols - assert len(self.cols) == 1, "Bucket numerical transformation only supports single column" + assert len(self.cols) == 1, "Rank Guass numerical transformation only supports single column" # Spark uses 'mode' for the most frequent element self.shared_imputation = "mode" if imputer == "most_frequent" else imputer self.epsilon = epsilon @@ -59,8 +59,9 @@ def get_transformation_name() -> str: return "DistRankGaussNumericalTransformation" def apply(self, input_df: DataFrame) -> DataFrame: - imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df) - column_name = input_df.columns[0] + column_name = self.cols[0] + select_df = input_df.select(column_name) + imputed_df = apply_imputation(self.cols, self.shared_imputation, select_df) id_df = imputed_df.withColumn('id', F.monotonically_increasing_id()) sorted_df = id_df.orderBy(column_name) @@ -69,14 +70,13 @@ def apply(self, input_df: DataFrame) -> DataFrame: def gauss_transform(rank: pd.Series) -> pd.Series: epsilon = self.epsilon feat_range = num_rows - 1 - normalized_rank = (rank - 1) / feat_range - clipped_rank = (normalized_rank - 0.5) * 2 + clipped_rank = (rank / feat_range - 0.5) * 2 clipped_rank = np.maximum(np.minimum(clipped_rank, 1 - epsilon), epsilon - 1) return pd.Series(erfinv(clipped_rank)) gauss_udf = F.pandas_udf(gauss_transform, FloatType()) num_rows = indexed_df.count() normalized_df = indexed_df.withColumn(column_name, gauss_udf('index')) - gauss_transformed_df = normalized_df.orderBy('id').drop('id', 'index') + gauss_transformed_df = normalized_df.orderBy('id').drop("index", "id") return gauss_transformed_df diff --git a/graphstorm-processing/tests/test_dist_rankgauss_transformation.py b/graphstorm-processing/tests/test_dist_rankgauss_transformation.py new file mode 100644 index 0000000000..8bd5d99026 --- /dev/null +++ b/graphstorm-processing/tests/test_dist_rankgauss_transformation.py @@ -0,0 +1,56 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from pyspark.sql import DataFrame, SparkSession +import pytest +import numpy as np +from numpy.testing import assert_almost_equal +from scipy.special import erfinv +from graphstorm_processing.data_transformations.dist_transformations import ( + DistRankGaussNumericalTransformation, +) + + +def rank_gauss(feat, eps): + lower = -1 + eps + upper = 1 - eps + range = upper - lower + i = np.argsort(feat, axis=0) + j = np.argsort(i, axis=0) + j_range = len(j) - 1 + divider = j_range / range + feat = j / divider + feat = feat - upper + return erfinv(feat) + + +@pytest.mark.parametrize("epsilon", [0.0, 1e-6]) +def test_rank_guass(spark: SparkSession, check_df_schema, epsilon): + data = [("mark", 0.0, None), ("john", 15.0, 10000), ("tara", 26.0, 20000), ("jen", 40.0, 10000)] + + columns = ["name", "age", "salary"] + input_df = spark.createDataFrame(data, schema=columns) + rg_transformation = DistRankGaussNumericalTransformation( + ["age"], epsilon=epsilon + ) + + output_df = rg_transformation.apply(input_df) + check_df_schema(output_df) + + out_rows = output_df.collect() + + expected_vals = rank_gauss(np.array([[0.0], [15.0], [26.0], [40.0]]), epsilon) + for i, row in enumerate(out_rows): + assert_almost_equal([row["age"]], expected_vals[i, :], decimal=4, err_msg=f"Row {i} is not equal")