Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing] Add saving and re-applying for numerical transforms. #1085

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/cli/graph-construction/distributed/example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ the graph structure, features, and labels. In more detail:
instead of creating new ones, ensuring that models trained with the original
data can still be used in the newly transformed data. Currently only
categorical and numerical transformations can be re-applied. Note that
the Rank-Gauss transformation cannot support re-application, it can
the Rank-Gauss transformation does not support re-application, it may
only work for transductive tasks.
* ``updated_row_counts_metadata.json``:
This file is meant to be used as the input configuration for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)


class DistFeatureTransformer(object):
class DistFeatureTransformer:
"""
Given a feature configuration selects the correct transformation type,
which can then be be applied through a call to apply_transformation.
Expand All @@ -56,7 +56,9 @@ def __init__(
if feat_type == "no-op":
self.transformation = NoopTransformation(**default_kwargs, **args_dict)
elif feat_type == "numerical":
self.transformation = DistNumericalTransformation(**default_kwargs, **args_dict)
self.transformation = DistNumericalTransformation(
**default_kwargs, **args_dict, json_representation=json_representation
)
elif feat_type == "multi-numerical":
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
self.transformation = DistMultiNumericalTransformation(**default_kwargs, **args_dict)
elif feat_type == "bucket-numerical":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,21 @@

@dataclass
class ImputationResult:
"""Container class to store the results of imputation.
"""Dataclass to store the results of imputation.

Parameters
----------
imputed_df: DataFrame
The imputed DataFrame.
impute_representation: dict[str, dict]
A representation of the imputation.
A dict representation of the imputation applied.

Structure:
imputed_val_dict: dict[str, float]
The imputed values for each column, {col_name: imputation_val}.
Empty if no imputation was applied.
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
imputer_name: str
The name of imputer used.
"""

imputed_df: DataFrame
Expand All @@ -69,14 +76,25 @@ class ImputationResult:

@dataclass
class NormalizationResult:
"""Container class to store the results of normalization.
"""Dataclass to store the results of normalization.

thvasilo marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
scaled_df: DataFrame
The normalized DataFrame.
normalization_representation: dict[str, dict]
A representation of the normalization.
The reconstruction information for the normalizer. Empty if no normalization
was applied. Inner structure depends on normalizer.

Structure for MinMaxScaler:
originalMinValues: list[float]
The original minimum values for each column, in the order of the cols key.
originalMaxValues: list[float]
The original maximum values for each column, in the order of the cols key.

Structure for StandardScaler:
col_sums: dict[str, float]
The sum of each column.
"""

scaled_df: DataFrame
Expand Down Expand Up @@ -174,6 +192,8 @@ def apply_norm(
A dataclass containing the normalized DataFrame with only the
columns listed in ``cols`` retained in the ``scaled_df`` element,
and a dict representation of the transformation in the ``normalization_representation``
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
variable. Inner structure depends on normalizer, see ``NormalizationResult`` docstring
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
for details.

Raises
------
Expand Down Expand Up @@ -207,7 +227,9 @@ def apply_norm(
scaled_df, norm_reconstruction = _apply_standard_transform(imputed_df, cols, out_dtype)
norm_representation["norm_reconstruction"] = norm_reconstruction
elif shared_norm == "rank-gauss":
assert len(cols) == 1, "Rank-Gauss numerical transformation only supports single column"
assert (
len(cols) == 1
), f"Rank-Gauss numerical transformation only supports single column, got {cols}"
norm_representation["norm_reconstruction"] = {}
column_name = cols[0]
select_df = imputed_df.select(column_name)
Expand Down Expand Up @@ -251,6 +273,8 @@ def _apply_standard_transform(
) -> tuple[DataFrame, dict]:
"""Applies standard scaling to the input DataFrame, individually to each of the columns.

thvasilo marked this conversation as resolved.
Show resolved Hide resolved
Each value in a column is divided by the sum of all values in that column.

Parameters
----------
input_df : DataFrame
Expand All @@ -267,6 +291,10 @@ def _apply_standard_transform(
tuple[DataFrame, dict]
The transformed dataframe and the representation of the standard transform as dict.

Representation structure::
col_sums: dict[str, float]
The sum of each column, {col_name: sum}.

Raises
------
RuntimeError
Expand Down Expand Up @@ -297,7 +325,12 @@ def _apply_min_max_transform(
original_min_vals: Optional[list[float]] = None,
original_max_vals: Optional[list[float]] = None,
) -> tuple[DataFrame, dict]:
"""Applies min max normalization to the input.
"""Applies min-max normalization to the input, rescaling each feature to the [0, 1] range.

Each value ``x`` in a column is transformed as follows:
.. math::

x = \\frac{x - \\text{col_min}}{\\text{col_max} - \\text{col_min}}

thvasilo marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
Expand All @@ -309,15 +342,21 @@ def _apply_min_max_transform(
Other cols that we want to retain
out_dtype : str
Numerical type of output data.
original_min_vals : Optional[list[float]], optional
original_min_vals : Optional[list[float]]
Pre-calculated minimum values for each column, by default None
original_max_vals : Optional[list[float]], optional
original_max_vals : Optional[list[float]]
Pre-calculated maximum values for each column, by default None

Returns
-------
tuple[DataFrame, dict]
The transformed DataFrame and the representation of the min-max transform as dict.

Representation structure:
originalMinValues: list[float]
The original minimum values for each column, in the order of the cols key.
originalMaxValues: list[float]
The original maximum values for each column, in the order of the cols key.
"""

# Use the map to get the corresponding data type object, or raise an error if not found
Expand Down Expand Up @@ -366,6 +405,8 @@ def _apply_min_max_transform(
)

# Fit a pipeline on just the dummy DF
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
# MinMaxScaler computes the minimum and maximum of dummy_df
# to be used for later scaling
scaler_pipeline = pipeline.fit(dummy_df)
else:
# Fit a pipeline on the entire input DF
Expand All @@ -374,8 +415,7 @@ def _apply_min_max_transform(
# Transform the input DF
scaled_df = scaler_pipeline.transform(input_df).drop(*vector_cols).drop(*cols)

# F.col(scaled_col_name).getField('values'[0] get the first element of a SparseVector
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.linalg.SparseVector.html
# Convert Spark Vector to array and get its first element and rename col to original name
scaled_df = scaled_df.select(
*[
(vector_to_array(F.col(scaled_col_name), dtype=out_dtype)[0].alias(orig_col))
Expand All @@ -387,6 +427,10 @@ def _apply_min_max_transform(
# MinMaxScalerModel for each feature. So we skip the first num_cols to
# get just the MinMaxScalerModels
min_max_models: list[MinMaxScalerModel] = scaler_pipeline.stages[len(cols) :]
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
for min_max_model in min_max_models:
assert isinstance(
min_max_model, MinMaxScalerModel
), f"Expected MinMaxScalerModel, got {type(min_max_model)}"
norm_reconstruction = {
"originalMinValues": [
min_max_model.originalMin.toArray()[0] for min_max_model in min_max_models
Expand Down Expand Up @@ -417,6 +461,10 @@ class DistNumericalTransformation(DistributedTransformation):
Output feature dtype
epsilon: float
Epsilon for normalization used to avoid INF float during computation.
json_representation: Optional[dict]
JSON representation of the transformation. If provided, the transformation
will be applied using this representation.
See ``DistNumericalTransformation.get_json_representation()`` for dict structure.
"""

def __init__(
Expand Down
59 changes: 59 additions & 0 deletions graphstorm-processing/tests/test_dist_feature_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Tests for DistFeatureTransformer which wraps individual transformations with a common API"""

from unittest.mock import Mock

from pyspark.sql import DataFrame, SparkSession

from graphstorm_processing.data_transformations.dist_feature_transformer import (
DistFeatureTransformer,
)
from graphstorm_processing.config.numerical_configs import NumericalFeatureConfig


def test_precomputed_transformer(spark: SparkSession, user_df: DataFrame):
"""Ensure the pre-computed transformation is used when we provide one as input"""

num_feature_config = NumericalFeatureConfig(
{
"column": "age",
"transformation": {
"name": "numerical",
"kwargs": {"imputer": "mean", "normalizer": "min-max"},
},
}
)

json_rep = {
"cols": ["age"],
"imputer_model": {
"imputed_val_dict": {"age": 27.2},
"imputer_name": "mean",
},
"normalizer_model": {
"norm_name": "min-max",
"norm_reconstruction": {
"originalMinValues": [33],
"originalMaxValues": [22],
},
},
"out_dtype": "float32",
"transformation_name": "DistNumericalTransformation",
}

numerical_transformer = DistFeatureTransformer(
num_feature_config,
spark,
json_rep,
)

# Mock the inner transformation function to check if it's called
numerical_transformer.transformation.apply_precomputed_transformation = Mock()

# Call the outer transformation
_, new_rep = numerical_transformer.apply_transformation(user_df)

# Assert the precomputed method was called exactly once
numerical_transformer.transformation.apply_precomputed_transformation.assert_called_once()

# Assert the newly returned rep matches the previous one
assert new_rep == json_rep
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,13 @@ def test_rank_gauss_reshuffling(spark: SparkSession, check_df_schema, epsilon):
)


def test_json_representation(input_df: DataFrame, check_df_schema):
@pytest.mark.parametrize("normalizer", ["min-max", "none", "standard", "rank-gauss"])
def test_json_representation(input_df: DataFrame, check_df_schema, normalizer):
"""Test that the generated representation is correct"""
imputer = "mean"
cols = ["salary", "age"] if normalizer != "rank-gauss" else ["salary"]
dist_numerical_transformation = DistNumericalTransformation(
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
["salary", "age"], imputer="mean", normalizer="min-max"
cols, imputer=imputer, normalizer=normalizer
)
transformed_df = dist_numerical_transformation.apply(input_df)
json_rep = dist_numerical_transformation.get_json_representation()
Expand All @@ -472,9 +475,9 @@ def test_json_representation(input_df: DataFrame, check_df_schema):
assert "imputer_model" in json_rep
assert "normalizer_model" in json_rep
assert "out_dtype" in json_rep
assert json_rep["cols"] == ["salary", "age"]
assert json_rep["imputer_model"]["imputer_name"] == "mean"
assert json_rep["normalizer_model"]["norm_name"] == "min-max"
assert json_rep["cols"] == cols
assert json_rep["imputer_model"]["imputer_name"] == imputer
assert json_rep["normalizer_model"]["norm_name"] == normalizer

check_df_schema(transformed_df)

Expand Down
Loading