Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
jalencato committed Feb 16, 2024
1 parent 7b7de6e commit 4fe1b10
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def _convert_feature(feats: list[dict]) -> list[dict]:
}

if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]:
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict["out_dtype"]
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[
"out_dtype"
]
elif gconstruct_transform_dict["name"] == "bucket_numerical":
gsp_transformation_dict["name"] = "bucket-numerical"
assert (
Expand All @@ -127,9 +129,13 @@ def _convert_feature(feats: list[dict]) -> list[dict]:
}

if "epsilon" in gconstruct_transform_dict:
gsp_transformation_dict["kwargs"]["epsilon"] = gconstruct_transform_dict["epsilon"]
gsp_transformation_dict["kwargs"]["epsilon"] = gconstruct_transform_dict[
"epsilon"
]
if gconstruct_transform_dict.get("out_dtype") in ["float32", "float64"]:
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict["out_dtype"]
gsp_transformation_dict["kwargs"]["out_dtype"] = gconstruct_transform_dict[
"out_dtype"
]
elif gconstruct_transform_dict["name"] == "to_categorical":
if "separator" in gconstruct_transform_dict:
gsp_transformation_dict["name"] = "multi-categorical"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import List, Optional

from pyspark.sql import DataFrame
Expand Down Expand Up @@ -41,7 +42,9 @@ class NoopTransformation(DistributedTransformation):
The output feature dtype
"""

def __init__(self, cols: List[str], out_dtype: str = "float32", separator: Optional[str] = None) -> None:
def __init__(
self, cols: List[str], out_dtype: str = "float32", separator: Optional[str] = None
) -> None:
super().__init__(cols)
# TODO: Support multiple cols?

Expand Down Expand Up @@ -94,7 +97,10 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo
return input_df
else:
return input_df.select(
[F.col(column).cast(DTYPE_MAP[self.out_dtype]).alias(column) for column in self.cols]
[
F.col(column).cast(DTYPE_MAP[self.out_dtype]).alias(column)
for column in self.cols
]
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import logging
from typing import Optional, Sequence
import uuid
import numpy

from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType, DoubleType
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import MinMaxScaler, Imputer, VectorAssembler, ElementwiseProduct
from pyspark.ml.linalg import DenseVector
from pyspark.ml.stat import Summarizer
Expand All @@ -33,7 +33,12 @@
# pylint: disable = no-name-in-module
from scipy.special import erfinv

from graphstorm_processing.constants import SPECIAL_CHARACTERS, VALID_IMPUTERS, VALID_NORMALIZERS, DTYPE_MAP
from graphstorm_processing.constants import (
SPECIAL_CHARACTERS,
VALID_IMPUTERS,
VALID_NORMALIZERS,
DTYPE_MAP,
)
from .base_dist_transformation import DistributedTransformation
from ..spark_utils import rename_multiple_cols

Expand Down Expand Up @@ -81,7 +86,11 @@ def apply_imputation(cols: Sequence[str], shared_imputation: str, input_df: Data


def apply_norm(
cols: Sequence[str], shared_norm: str, imputed_df: DataFrame, out_dtype: str = "float32", epsilon: float = 1e-6
cols: Sequence[str],
shared_norm: str,
imputed_df: DataFrame,
out_dtype: str = "float32",
epsilon: float = 1e-6,
) -> DataFrame:
"""Applies a single normalizer to the imputed dataframe, individually to each of the columns
provided in the cols argument.
Expand Down Expand Up @@ -166,7 +175,8 @@ def single_vec_to_float(vec):
"normalization. Use an imputer in the transformation."
)
scaled_df = imputed_df.select(
[(F.col(c) / col_sums[f"sum({c})"]).cast(DTYPE_MAP[out_dtype]).alias(c) for c in cols] + other_cols
[(F.col(c) / col_sums[f"sum({c})"]).cast(DTYPE_MAP[out_dtype]).alias(c) for c in cols]
+ other_cols
)
elif shared_norm == "rank-gauss":
assert len(cols) == 1, "Rank-Guass numerical transformation only supports single column"
Expand Down Expand Up @@ -223,7 +233,12 @@ class DistNumericalTransformation(DistributedTransformation):
"""

def __init__(
self, cols: Sequence[str], normalizer: str, imputer: str, out_dtype: str = "float32", epsilon: float = 1e-6
self,
cols: Sequence[str],
normalizer: str,
imputer: str,
out_dtype: str = "float32",
epsilon: float = 1e-6,
) -> None:
super().__init__(cols)
self.cols = cols
Expand All @@ -239,7 +254,9 @@ def apply(self, input_df: DataFrame) -> DataFrame:
)

imputed_df = apply_imputation(self.cols, self.shared_imputation, input_df)
scaled_df = apply_norm(self.cols, self.shared_norm, imputed_df, self.out_dtype, self.epsilon)
scaled_df = apply_norm(
self.cols, self.shared_norm, imputed_df, self.out_dtype, self.epsilon
)

# TODO: Figure out why the transformation is producing Double values, and switch to float
return scaled_df
Expand Down

0 comments on commit 4fe1b10

Please sign in to comment.