Skip to content

Commit

Permalink
apply comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jalencato committed Nov 7, 2023
1 parent 75d7b55 commit 9e31d8f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,12 @@ def _convert_feature(feats: list[dict]) -> list[dict]:
elif gconstruct_transform_dict["name"] == "rank_gauss":
gsp_transformation_dict["name"] = "numerical"
if "epsilon" in gconstruct_transform_dict:
# pylint: disable=line-too-long
gsp_transformation_dict["kwargs"] = {
"epsilon": gconstruct_transform_dict["epsilon"],
"normalizer": "rank-gauss",
"imputer": "none",
}
else:
# pylint: disable=line-too-long
gsp_transformation_dict["kwargs"] = {
"normalizer": "rank-gauss",
"imputer": "none",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,34 @@ def single_vec_to_float(vec):
[(F.col(c) / col_sums[f"sum({c})"]).alias(c) for c in cols] + other_cols
)
elif shared_norm == "rank-gauss":
assert len(cols) == 1, "Rank Guass numerical transformation only supports single column"
for column_name in cols:
select_df = imputed_df.select(column_name)
original_id_rank = f"id_{uuid.uuid4().hex[8]}"
index_id_rank = f"index_{uuid.uuid4().hex[8]}"
id_df = select_df.withColumn(original_id_rank, F.monotonically_increasing_id())
sorted_df = id_df.orderBy(column_name)
indexed_df = sorted_df.withColumn(index_id_rank, F.monotonically_increasing_id())

# pylint: disable = cell-var-from-loop
# It is required to put num_rows definition outside,
# or pandas.udf will throw an error
def gauss_transform(rank: pd.Series) -> pd.Series:
feat_range = num_rows - 1
clipped_rank = (rank / feat_range - 0.5) * 2
clipped_rank = np.maximum(np.minimum(clipped_rank, 1 - epsilon), epsilon - 1)
return pd.Series(erfinv(clipped_rank))

num_rows = indexed_df.count()
gauss_udf = F.pandas_udf(gauss_transform, FloatType())
normalized_df = indexed_df.withColumn(column_name, gauss_udf(index_id_rank))
scaled_df = normalized_df.orderBy(original_id_rank).drop(
index_id_rank, original_id_rank
)
assert len(cols) == 1, "Rank-Guass numerical transformation only supports single column"
column_name = cols[0]
select_df = imputed_df.select(column_name)
# original id is the original order for the input data frame, index id is the sorted order.
# We need original id to help us restore the order.
original_order_col = f"original-order-{uuid.uuid4().hex[8]}"
value_rank_col = f"value-rank-{uuid.uuid4().hex[8]}"
df_with_order_idx = select_df.withColumn(
original_order_col, F.monotonically_increasing_id()
)
value_sorted_df = df_with_order_idx.orderBy(column_name)
value_rank_df = value_sorted_df.withColumn(value_rank_col, F.monotonically_increasing_id())

# pylint: disable = cell-var-from-loop
# It is required to put num_rows definition outside,
# or pandas.udf will throw an error
def gauss_transform(rank: pd.Series) -> pd.Series:
feat_range = num_rows - 1
clipped_rank = (rank / feat_range - 0.5) * 2
clipped_rank = np.maximum(np.minimum(clipped_rank, 1 - epsilon), epsilon - 1)
return pd.Series(erfinv(clipped_rank))

num_rows = value_rank_df.count()
gauss_udf = F.pandas_udf(gauss_transform, FloatType())
normalized_df = value_rank_df.withColumn(column_name, gauss_udf(value_rank_col))
scaled_df = normalized_df.orderBy(original_order_col).drop(
value_rank_col, original_order_col
)

return scaled_df

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.sql.types import ArrayType, FloatType, StructField, StructType, StringType
from scipy.special import erfinv

from graphstorm_processing.data_transformations.dist_transformations import (
DistNumericalTransformation,
DistMultiNumericalTransformation,
Expand Down Expand Up @@ -315,10 +316,9 @@ def rank_gauss(feat, eps):

@pytest.mark.parametrize("epsilon", [0.0, 1e-6])
def test_rank_gauss(spark: SparkSession, check_df_schema, epsilon):
data = [("mark", 0.0, None), ("john", 15.0, 10000), ("tara", 26.0, 20000), ("jen", 40.0, 10000)]
data = [(0.0,), (15.0,), (26.0,), (40.0,)]

columns = ["name", "age", "salary"]
input_df = spark.createDataFrame(data, schema=columns)
input_df = spark.createDataFrame(data, schema=["age"])
rg_transformation = DistNumericalTransformation(
["age"], imputer="none", normalizer="rank-gauss", epsilon=epsilon
)
Expand Down

0 comments on commit 9e31d8f

Please sign in to comment.