Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing] Allow custom out_dtype in feature transformation #739

Merged
merged 28 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/gs-processing/developer/input-configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ arguments.
- ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values,
converts the ranks to the -1/1 range, and applies the `inverse of the error function <https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.erfinv.html>`_ to make the values conform
to a Gaussian distribution shape. This transformation only supports a single column as input.
- ``out_dtype`` (Optional): Specify the data type of the transformed feature.
Currently we only support ``float32`` and ``float64`` .
- ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator
to avoid infinite values during normalization.
- ``multi-numerical``
Expand All @@ -406,6 +408,8 @@ arguments.
- ``separator`` (String, optional): Same as for ``no-op`` transformation, used to separate numerical
values in CSV input. If the input data are in Parquet format, each value in the
column is assumed to be an array of floats.
- ``out_dtype`` (Optional): Specify the data type of the transformed feature.
Currently we only support ``float32`` and ``float64`` .

- ``bucket-numerical``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,15 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]:

if gconstruct_transform_dict["name"] == "max_min_norm":
gsp_transformation_dict["name"] = "numerical"
gsp_transformation_dict["kwargs"] = {"normalizer": "min-max", "imputer": "none"}
gsp_transformation_dict["kwargs"] = {
"normalizer": "min-max",
"imputer": "none",
}

if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]:
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[
"out_dtype"
]
elif gconstruct_transform_dict["name"] == "bucket_numerical":
gsp_transformation_dict["name"] = "bucket-numerical"
assert (
Expand All @@ -119,17 +127,19 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]:
}
elif gconstruct_transform_dict["name"] == "rank_gauss":
gsp_transformation_dict["name"] = "numerical"
gsp_transformation_dict["kwargs"] = {
"normalizer": "rank-gauss",
"imputer": "none",
}

if "epsilon" in gconstruct_transform_dict:
gsp_transformation_dict["kwargs"] = {
"epsilon": gconstruct_transform_dict["epsilon"],
"normalizer": "rank-gauss",
"imputer": "none",
}
else:
gsp_transformation_dict["kwargs"] = {
"normalizer": "rank-gauss",
"imputer": "none",
}
gsp_transformation_dict["kwargs"]["epsilon"] = gconstruct_transform_dict[
"epsilon"
]
if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]:
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[
"out_dtype"
]
elif gconstruct_transform_dict["name"] == "to_categorical":
if "separator" in gconstruct_transform_dict:
gsp_transformation_dict["name"] = "multi-categorical"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import abc
from typing import Any, Mapping, Sequence

from graphstorm_processing.constants import VALID_OUTDTYPE, TYPE_FLOAT32

from .data_config_base import DataStorageConfig


Expand Down Expand Up @@ -96,6 +98,7 @@ def __init__(self, config: Mapping):
super().__init__(config)

self.value_separator = None
self.out_dtype = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32)
if self._transformation_kwargs:
self.value_separator = self._transformation_kwargs.get("separator")

Expand All @@ -105,3 +108,6 @@ def _sanity_check(self) -> None:
super()._sanity_check()
if self._data_config and self.value_separator and self._data_config.format != "csv":
raise RuntimeError("separator should only be provided for CSV data")
assert (
self.out_dtype in VALID_OUTDTYPE
), f"Unsupported output dtype, expected one of {VALID_OUTDTYPE}, got {self.out_dtype}"
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
from typing import Mapping
import numbers

from graphstorm_processing.constants import VALID_IMPUTERS, VALID_NORMALIZERS
from graphstorm_processing.constants import (
VALID_IMPUTERS,
VALID_NORMALIZERS,
VALID_OUTDTYPE,
TYPE_FLOAT32,
)

from .feature_config_base import FeatureConfig


Expand All @@ -42,12 +48,17 @@ class NumericalFeatureConfig(FeatureConfig):
and then dividing it by the difference between the maximum value and the minimum.
* "standard": Normalize each value by dividing it by the sum of all the values.
* "rank-gauss": Normalize each value by rank gauss normalization.

out_dtype: str
Output feature dtype. Currently, we support ``float32`` and ``float64``.
Default is ``float32``
"""

def __init__(self, config: Mapping):
super().__init__(config)
self.imputer = self._transformation_kwargs.get("imputer", "none")
self.norm = self._transformation_kwargs.get("normalizer", "none")
self.out_dtype = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32)

self._sanity_check()

Expand All @@ -59,6 +70,9 @@ def _sanity_check(self) -> None:
assert (
self.norm in VALID_NORMALIZERS
), f"Unknown normalizer requested, expected one of {VALID_NORMALIZERS}, got {self.norm}"
assert (
self.out_dtype in VALID_OUTDTYPE
), f"Unsupported output dtype, expected one of {VALID_OUTDTYPE}, got {self.out_dtype}"


class MultiNumericalFeatureConfig(NumericalFeatureConfig):
Expand Down
6 changes: 6 additions & 0 deletions graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
limitations under the License.
"""

from pyspark.sql.types import FloatType, DoubleType

################### Categorical Limits #######################
MAX_CATEGORIES_PER_FEATURE = 100
RARE_CATEGORY = "GSP_CONSTANT_OTHER"
Expand Down Expand Up @@ -44,6 +46,10 @@
################# Numerical transformations ################
VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"]
VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"]
TYPE_FLOAT32 = "float32"
TYPE_FLOAT64 = "float64"
VALID_OUTDTYPE = [TYPE_FLOAT32, TYPE_FLOAT64]
DTYPE_MAP = {TYPE_FLOAT32: FloatType(), TYPE_FLOAT64: DoubleType()}

################# Bert transformations ################
HUGGINGFACE_TRANFORM = "huggingface"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType, NumericType
from pyspark.sql.types import ArrayType, NumericType

from graphstorm_processing.constants import SPECIAL_CHARACTERS, DTYPE_MAP, TYPE_FLOAT32

from graphstorm_processing.constants import SPECIAL_CHARACTERS
from .base_dist_transformation import DistributedTransformation


Expand All @@ -38,12 +39,17 @@ class NoopTransformation(DistributedTransformation):
The list of columns to parse as floats or lists of float
separator : Optional[str], optional
Optional separator to use to split the string, by default None
out_dtype: str
The output feature dtype
"""

def __init__(self, cols: List[str], separator: Optional[str] = None) -> None:
def __init__(
self, cols: List[str], out_dtype: str = TYPE_FLOAT32, separator: Optional[str] = None
) -> None:
super().__init__(cols)
# TODO: Support multiple cols?

self.out_dtype = out_dtype
self.separator = separator
# Spark's split function uses a regexp so we need to
# escape special chars to be used as separators
Expand Down Expand Up @@ -78,7 +84,7 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo
return None

strvec_to_float_vec_udf = F.udf(
str_list_to_float_vec, ArrayType(DoubleType(), containsNull=False)
str_list_to_float_vec, ArrayType(DTYPE_MAP[self.out_dtype], containsNull=False)
)

if self.separator:
Expand All @@ -92,7 +98,10 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo
return input_df
else:
return input_df.select(
[F.col(column).cast(DoubleType()).alias(column) for column in self.cols]
[
F.col(column).cast(DTYPE_MAP[self.out_dtype]).alias(column)
for column in self.cols
]
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import MinMaxScaler, Imputer, VectorAssembler, ElementwiseProduct
from pyspark.ml.linalg import DenseVector
from pyspark.ml.stat import Summarizer
Expand All @@ -33,7 +33,13 @@
# pylint: disable = no-name-in-module
from scipy.special import erfinv

from graphstorm_processing.constants import SPECIAL_CHARACTERS, VALID_IMPUTERS, VALID_NORMALIZERS
from graphstorm_processing.constants import (
SPECIAL_CHARACTERS,
VALID_IMPUTERS,
VALID_NORMALIZERS,
DTYPE_MAP,
TYPE_FLOAT32,
)
from .base_dist_transformation import DistributedTransformation
from ..spark_utils import rename_multiple_cols

Expand Down Expand Up @@ -81,7 +87,11 @@ def apply_imputation(cols: Sequence[str], shared_imputation: str, input_df: Data


def apply_norm(
cols: Sequence[str], shared_norm: str, imputed_df: DataFrame, epsilon: float = 1e-6
cols: Sequence[str],
shared_norm: str,
imputed_df: DataFrame,
out_dtype: str = TYPE_FLOAT32,
epsilon: float = 1e-6,
) -> DataFrame:
"""Applies a single normalizer to the imputed dataframe, individually to each of the columns
provided in the cols argument.
Expand All @@ -96,6 +106,8 @@ def apply_norm(
imputed_df : DataFrame
The input DataFrame to apply normalization to. It should not contain
missing values.
out_dtype: str
The output feature dtype.
epsilon: float
Epsilon for normalization used to avoid INF float during computation
on "rank-gauss".
Expand All @@ -109,20 +121,29 @@ def apply_norm(
------
RuntimeError
If missing values exist in the data when the "standard" normalizer is used.

ValueError
If unsupported feature output dtype is provided.
"""
other_cols = list(set(imputed_df.columns).difference(cols))

def single_vec_to_float(vec):
return float(vec[0])

vec_udf = F.udf(single_vec_to_float, FloatType())
# Use the map to get the corresponding data type object, or raise an error if not found
if out_dtype in DTYPE_MAP:
vec_udf = F.udf(single_vec_to_float, DTYPE_MAP[out_dtype])
else:
raise ValueError("Unsupported feature output dtype")

assert shared_norm in VALID_NORMALIZERS, (
f"Unsupported normalization requested: {shared_norm}, the supported "
f"strategies are : {VALID_NORMALIZERS}"
)

if shared_norm == "none":
# Save the time and efficiency for not casting the type
# when not doing any normalization
scaled_df = imputed_df
elif shared_norm == "min-max":
# Because the scalers expect Vector input, we need to use VectorAssembler on each,
Expand Down Expand Up @@ -155,7 +176,8 @@ def single_vec_to_float(vec):
"normalization. Use an imputer in the transformation."
)
scaled_df = imputed_df.select(
[(F.col(c) / col_sums[f"sum({c})"]).alias(c) for c in cols] + other_cols
[(F.col(c) / col_sums[f"sum({c})"]).cast(DTYPE_MAP[out_dtype]).alias(c) for c in cols]
+ other_cols
)
elif shared_norm == "rank-gauss":
assert len(cols) == 1, "Rank-Guass numerical transformation only supports single column"
Expand All @@ -182,7 +204,7 @@ def gauss_transform(rank: pd.Series) -> pd.Series:
return pd.Series(erfinv(clipped_rank))

num_rows = value_rank_df.count()
gauss_udf = F.pandas_udf(gauss_transform, FloatType())
gauss_udf = F.pandas_udf(gauss_transform, DTYPE_MAP[out_dtype])
normalized_df = value_rank_df.withColumn(column_name, gauss_udf(value_rank_col))
scaled_df = normalized_df.orderBy(original_order_col).drop(
value_rank_col, original_order_col
Expand All @@ -205,17 +227,25 @@ class DistNumericalTransformation(DistributedTransformation):
imputer : str
The type of missing value imputation to apply to the column.
Valid values are "mean", "median" and "most_frequent".
out_dtype: str
Output feature dtype
epsilon: float
Epsilon for normalization used to avoid INF float during computation.
"""

def __init__(
self, cols: Sequence[str], normalizer: str, imputer: str, epsilon: float = 1e-6
self,
cols: Sequence[str],
normalizer: str,
imputer: str,
out_dtype: str = TYPE_FLOAT32,
epsilon: float = 1e-6,
) -> None:
super().__init__(cols)
self.cols = cols
self.shared_norm = normalizer
self.epsilon = epsilon
self.out_dtype = out_dtype
# Spark uses 'mode' for the most frequent element
self.shared_imputation = "mode" if imputer == "most_frequent" else imputer

Expand All @@ -225,7 +255,9 @@ def apply(self, input_df: DataFrame) -> DataFrame:
)

imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df)
scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df, self.epsilon)
scaled_df = apply_norm(
self.cols, self.shared_norm, imputed_df, self.out_dtype, self.epsilon
)

# TODO: Figure out why the transformation is producing Double values, and switch to float
return scaled_df
Expand All @@ -251,10 +283,17 @@ class DistMultiNumericalTransformation(DistNumericalTransformation):
imputer : str
The type of missing value imputation to apply to the column.
Valid values are "mean", "median" and "most_frequent".
out_dtype: str
Output feature dtype
"""

def __init__(
self, cols: Sequence[str], separator: Optional[str], normalizer: str, imputer: str
self,
cols: Sequence[str],
separator: Optional[str],
normalizer: str,
imputer: str,
out_dtype: str = TYPE_FLOAT32,
) -> None:
assert (
len(cols) == 1
Expand All @@ -269,6 +308,7 @@ def __init__(
# special chars to be used as separators
if self.separator in SPECIAL_CHARACTERS:
self.separator = f"\\{self.separator}"
self.out_dtype = out_dtype

@staticmethod
def get_transformation_name() -> str:
Expand Down Expand Up @@ -343,7 +383,7 @@ def convert_multistring_to_sequence_df(
), # Split along the separator
replace_empty_with_nan,
)
.cast(ArrayType(FloatType(), True))
.cast(ArrayType(DTYPE_MAP[self.out_dtype], True))
.alias(self.multi_column)
)

Expand Down Expand Up @@ -405,7 +445,7 @@ def vector_df_has_nan(vector_df: DataFrame, vector_col: str) -> bool:
else:
split_array_df = input_df.select(
F.col(self.multi_column)
.cast(ArrayType(FloatType(), True))
.cast(ArrayType(DTYPE_MAP[self.out_dtype], True))
.alias(self.multi_column)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"name": "numerical",
"kwargs": {
"normalizer": "none",
"imputer": "mean"
"imputer": "mean",
"out_dtype": "float64"
}
}
},
Expand Down
Loading
Loading