Skip to content

Commit

Permalink
[GSProcessing] Allow custom out_dtype in feature transformation (#739)
Browse files Browse the repository at this point in the history
*Issue #, if available:*

*Description of changes:*

* Add out_dtype support for no-op, numerical, and multi-numerical
transformation for float32 for floatType and float64 for doubleType.

* Allow transformation between gconstruct and gsprocessing.

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: runjie <[email protected]>
Co-authored-by: Theodore Vasiloudis <[email protected]>
  • Loading branch information
3 people authored Mar 23, 2024
1 parent 37135af commit ee4c49c
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 32 deletions.
4 changes: 4 additions & 0 deletions docs/source/gs-processing/developer/input-configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ arguments.
- ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values,
converts the ranks to the -1/1 range, and applies the `inverse of the error function <https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.erfinv.html>`_ to make the values conform
to a Gaussian distribution shape. This transformation only supports a single column as input.
- ``out_dtype`` (Optional): Specify the data type of the transformed feature.
Currently we only support ``float32`` and ``float64`` .
- ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator
to avoid infinite values during normalization.
- ``multi-numerical``
Expand All @@ -406,6 +408,8 @@ arguments.
- ``separator`` (String, optional): Same as for ``no-op`` transformation, used to separate numerical
values in CSV input. If the input data are in Parquet format, each value in the
column is assumed to be an array of floats.
- ``out_dtype`` (Optional): Specify the data type of the transformed feature.
Currently we only support ``float32`` and ``float64`` .

- ``bucket-numerical``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,15 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]:

if gconstruct_transform_dict["name"] == "max_min_norm":
gsp_transformation_dict["name"] = "numerical"
gsp_transformation_dict["kwargs"] = {"normalizer": "min-max", "imputer": "none"}
gsp_transformation_dict["kwargs"] = {
"normalizer": "min-max",
"imputer": "none",
}

if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]:
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[
"out_dtype"
]
elif gconstruct_transform_dict["name"] == "bucket_numerical":
gsp_transformation_dict["name"] = "bucket-numerical"
assert (
Expand All @@ -119,17 +127,19 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]:
}
elif gconstruct_transform_dict["name"] == "rank_gauss":
gsp_transformation_dict["name"] = "numerical"
gsp_transformation_dict["kwargs"] = {
"normalizer": "rank-gauss",
"imputer": "none",
}

if "epsilon" in gconstruct_transform_dict:
gsp_transformation_dict["kwargs"] = {
"epsilon": gconstruct_transform_dict["epsilon"],
"normalizer": "rank-gauss",
"imputer": "none",
}
else:
gsp_transformation_dict["kwargs"] = {
"normalizer": "rank-gauss",
"imputer": "none",
}
gsp_transformation_dict["kwargs"]["epsilon"] = gconstruct_transform_dict[
"epsilon"
]
if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]:
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[
"out_dtype"
]
elif gconstruct_transform_dict["name"] == "to_categorical":
if "separator" in gconstruct_transform_dict:
gsp_transformation_dict["name"] = "multi-categorical"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import abc
from typing import Any, Mapping, Sequence

from graphstorm_processing.constants import VALID_OUTDTYPE, TYPE_FLOAT32

from .data_config_base import DataStorageConfig


Expand Down Expand Up @@ -96,6 +98,7 @@ def __init__(self, config: Mapping):
super().__init__(config)

self.value_separator = None
self.out_dtype = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32)
if self._transformation_kwargs:
self.value_separator = self._transformation_kwargs.get("separator")

Expand All @@ -105,3 +108,6 @@ def _sanity_check(self) -> None:
super()._sanity_check()
if self._data_config and self.value_separator and self._data_config.format != "csv":
raise RuntimeError("separator should only be provided for CSV data")
assert (
self.out_dtype in VALID_OUTDTYPE
), f"Unsupported output dtype, expected one of {VALID_OUTDTYPE}, got {self.out_dtype}"
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
from typing import Mapping
import numbers

from graphstorm_processing.constants import VALID_IMPUTERS, VALID_NORMALIZERS
from graphstorm_processing.constants import (
VALID_IMPUTERS,
VALID_NORMALIZERS,
VALID_OUTDTYPE,
TYPE_FLOAT32,
)

from .feature_config_base import FeatureConfig


Expand All @@ -42,12 +48,17 @@ class NumericalFeatureConfig(FeatureConfig):
and then dividing it by the difference between the maximum value and the minimum.
* "standard": Normalize each value by dividing it by the sum of all the values.
* "rank-gauss": Normalize each value by rank gauss normalization.
out_dtype: str
Output feature dtype. Currently, we support ``float32`` and ``float64``.
Default is ``float32``
"""

def __init__(self, config: Mapping):
super().__init__(config)
self.imputer = self._transformation_kwargs.get("imputer", "none")
self.norm = self._transformation_kwargs.get("normalizer", "none")
self.out_dtype = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32)

self._sanity_check()

Expand All @@ -59,6 +70,9 @@ def _sanity_check(self) -> None:
assert (
self.norm in VALID_NORMALIZERS
), f"Unknown normalizer requested, expected one of {VALID_NORMALIZERS}, got {self.norm}"
assert (
self.out_dtype in VALID_OUTDTYPE
), f"Unsupported output dtype, expected one of {VALID_OUTDTYPE}, got {self.out_dtype}"


class MultiNumericalFeatureConfig(NumericalFeatureConfig):
Expand Down
6 changes: 6 additions & 0 deletions graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
limitations under the License.
"""

from pyspark.sql.types import FloatType, DoubleType

################### Categorical Limits #######################
MAX_CATEGORIES_PER_FEATURE = 100
RARE_CATEGORY = "GSP_CONSTANT_OTHER"
Expand Down Expand Up @@ -44,6 +46,10 @@
################# Numerical transformations ################
VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"]
VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"]
TYPE_FLOAT32 = "float32"
TYPE_FLOAT64 = "float64"
VALID_OUTDTYPE = [TYPE_FLOAT32, TYPE_FLOAT64]
DTYPE_MAP = {TYPE_FLOAT32: FloatType(), TYPE_FLOAT64: DoubleType()}

################# Bert transformations ################
HUGGINGFACE_TRANFORM = "huggingface"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType, NumericType
from pyspark.sql.types import ArrayType, NumericType

from graphstorm_processing.constants import SPECIAL_CHARACTERS, DTYPE_MAP, TYPE_FLOAT32

from graphstorm_processing.constants import SPECIAL_CHARACTERS
from .base_dist_transformation import DistributedTransformation


Expand All @@ -38,12 +39,17 @@ class NoopTransformation(DistributedTransformation):
The list of columns to parse as floats or lists of float
separator : Optional[str], optional
Optional separator to use to split the string, by default None
out_dtype: str
The output feature dtype
"""

def __init__(self, cols: List[str], separator: Optional[str] = None) -> None:
def __init__(
self, cols: List[str], out_dtype: str = TYPE_FLOAT32, separator: Optional[str] = None
) -> None:
super().__init__(cols)
# TODO: Support multiple cols?

self.out_dtype = out_dtype
self.separator = separator
# Spark's split function uses a regexp so we need to
# escape special chars to be used as separators
Expand Down Expand Up @@ -78,7 +84,7 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo
return None

strvec_to_float_vec_udf = F.udf(
str_list_to_float_vec, ArrayType(DoubleType(), containsNull=False)
str_list_to_float_vec, ArrayType(DTYPE_MAP[self.out_dtype], containsNull=False)
)

if self.separator:
Expand All @@ -92,7 +98,10 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo
return input_df
else:
return input_df.select(
[F.col(column).cast(DoubleType()).alias(column) for column in self.cols]
[
F.col(column).cast(DTYPE_MAP[self.out_dtype]).alias(column)
for column in self.cols
]
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import MinMaxScaler, Imputer, VectorAssembler, ElementwiseProduct
from pyspark.ml.linalg import DenseVector
from pyspark.ml.stat import Summarizer
Expand All @@ -33,7 +33,13 @@
# pylint: disable = no-name-in-module
from scipy.special import erfinv

from graphstorm_processing.constants import SPECIAL_CHARACTERS, VALID_IMPUTERS, VALID_NORMALIZERS
from graphstorm_processing.constants import (
SPECIAL_CHARACTERS,
VALID_IMPUTERS,
VALID_NORMALIZERS,
DTYPE_MAP,
TYPE_FLOAT32,
)
from .base_dist_transformation import DistributedTransformation
from ..spark_utils import rename_multiple_cols

Expand Down Expand Up @@ -81,7 +87,11 @@ def apply_imputation(cols: Sequence[str], shared_imputation: str, input_df: Data


def apply_norm(
cols: Sequence[str], shared_norm: str, imputed_df: DataFrame, epsilon: float = 1e-6
cols: Sequence[str],
shared_norm: str,
imputed_df: DataFrame,
out_dtype: str = TYPE_FLOAT32,
epsilon: float = 1e-6,
) -> DataFrame:
"""Applies a single normalizer to the imputed dataframe, individually to each of the columns
provided in the cols argument.
Expand All @@ -96,6 +106,8 @@ def apply_norm(
imputed_df : DataFrame
The input DataFrame to apply normalization to. It should not contain
missing values.
out_dtype: str
The output feature dtype.
epsilon: float
Epsilon for normalization used to avoid INF float during computation
on "rank-gauss".
Expand All @@ -109,20 +121,29 @@ def apply_norm(
------
RuntimeError
If missing values exist in the data when the "standard" normalizer is used.
ValueError
If unsupported feature output dtype is provided.
"""
other_cols = list(set(imputed_df.columns).difference(cols))

def single_vec_to_float(vec):
return float(vec[0])

vec_udf = F.udf(single_vec_to_float, FloatType())
# Use the map to get the corresponding data type object, or raise an error if not found
if out_dtype in DTYPE_MAP:
vec_udf = F.udf(single_vec_to_float, DTYPE_MAP[out_dtype])
else:
raise ValueError("Unsupported feature output dtype")

assert shared_norm in VALID_NORMALIZERS, (
f"Unsupported normalization requested: {shared_norm}, the supported "
f"strategies are : {VALID_NORMALIZERS}"
)

if shared_norm == "none":
# Save the time and efficiency for not casting the type
# when not doing any normalization
scaled_df = imputed_df
elif shared_norm == "min-max":
# Because the scalers expect Vector input, we need to use VectorAssembler on each,
Expand Down Expand Up @@ -155,7 +176,8 @@ def single_vec_to_float(vec):
"normalization. Use an imputer in the transformation."
)
scaled_df = imputed_df.select(
[(F.col(c) / col_sums[f"sum({c})"]).alias(c) for c in cols] + other_cols
[(F.col(c) / col_sums[f"sum({c})"]).cast(DTYPE_MAP[out_dtype]).alias(c) for c in cols]
+ other_cols
)
elif shared_norm == "rank-gauss":
assert len(cols) == 1, "Rank-Guass numerical transformation only supports single column"
Expand All @@ -182,7 +204,7 @@ def gauss_transform(rank: pd.Series) -> pd.Series:
return pd.Series(erfinv(clipped_rank))

num_rows = value_rank_df.count()
gauss_udf = F.pandas_udf(gauss_transform, FloatType())
gauss_udf = F.pandas_udf(gauss_transform, DTYPE_MAP[out_dtype])
normalized_df = value_rank_df.withColumn(column_name, gauss_udf(value_rank_col))
scaled_df = normalized_df.orderBy(original_order_col).drop(
value_rank_col, original_order_col
Expand All @@ -205,17 +227,25 @@ class DistNumericalTransformation(DistributedTransformation):
imputer : str
The type of missing value imputation to apply to the column.
Valid values are "mean", "median" and "most_frequent".
out_dtype: str
Output feature dtype
epsilon: float
Epsilon for normalization used to avoid INF float during computation.
"""

def __init__(
self, cols: Sequence[str], normalizer: str, imputer: str, epsilon: float = 1e-6
self,
cols: Sequence[str],
normalizer: str,
imputer: str,
out_dtype: str = TYPE_FLOAT32,
epsilon: float = 1e-6,
) -> None:
super().__init__(cols)
self.cols = cols
self.shared_norm = normalizer
self.epsilon = epsilon
self.out_dtype = out_dtype
# Spark uses 'mode' for the most frequent element
self.shared_imputation = "mode" if imputer == "most_frequent" else imputer

Expand All @@ -225,7 +255,9 @@ def apply(self, input_df: DataFrame) -> DataFrame:
)

imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df)
scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df, self.epsilon)
scaled_df = apply_norm(
self.cols, self.shared_norm, imputed_df, self.out_dtype, self.epsilon
)

# TODO: Figure out why the transformation is producing Double values, and switch to float
return scaled_df
Expand All @@ -251,10 +283,17 @@ class DistMultiNumericalTransformation(DistNumericalTransformation):
imputer : str
The type of missing value imputation to apply to the column.
Valid values are "mean", "median" and "most_frequent".
out_dtype: str
Output feature dtype
"""

def __init__(
self, cols: Sequence[str], separator: Optional[str], normalizer: str, imputer: str
self,
cols: Sequence[str],
separator: Optional[str],
normalizer: str,
imputer: str,
out_dtype: str = TYPE_FLOAT32,
) -> None:
assert (
len(cols) == 1
Expand All @@ -269,6 +308,7 @@ def __init__(
# special chars to be used as separators
if self.separator in SPECIAL_CHARACTERS:
self.separator = f"\\{self.separator}"
self.out_dtype = out_dtype

@staticmethod
def get_transformation_name() -> str:
Expand Down Expand Up @@ -343,7 +383,7 @@ def convert_multistring_to_sequence_df(
), # Split along the separator
replace_empty_with_nan,
)
.cast(ArrayType(FloatType(), True))
.cast(ArrayType(DTYPE_MAP[self.out_dtype], True))
.alias(self.multi_column)
)

Expand Down Expand Up @@ -405,7 +445,7 @@ def vector_df_has_nan(vector_df: DataFrame, vector_col: str) -> bool:
else:
split_array_df = input_df.select(
F.col(self.multi_column)
.cast(ArrayType(FloatType(), True))
.cast(ArrayType(DTYPE_MAP[self.out_dtype], True))
.alias(self.multi_column)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"name": "numerical",
"kwargs": {
"normalizer": "none",
"imputer": "mean"
"imputer": "mean",
"out_dtype": "float64"
}
}
},
Expand Down
Loading

0 comments on commit ee4c49c

Please sign in to comment.