Skip to content

Commit

Permalink
[GSProcessing] Add structure for saving transformation JSON files. (#857
Browse files Browse the repository at this point in the history
)

## Description of changes:

* This commit only adds saving the transformations in a JSON
representation.
* Loading and applying the pre-computed transformation will come in an
upcoming PR.
* First implemented for categorical transformation.

Detailed changes:

To support the saving and loading of pre-computed transformations, we
use the following design.

* A new JSON output is created in the output that includes JSON
representations of the transformations
  for which we have implemented a representation.
* The base class `DistributedTransformation` now takes an _optional_
SparkSession and and _optional_ `json_representation` `dict` during
initialization.
* We add a `get_json_representation()` function to
`DistributedTransformation` which returns the JSON representation of the
transformation if it's not None, or an empty dict otherwise. The assumed
contract is that each individual transformation must populate its
`json_representation` dict during the call to `apply()` which applies
the transformation for the first time.
* The `DistFeatureTransformer` now also takes a SparkSession and a
json_representation `dict` in its constructor.
The Spark session is currently only passed to the constructor of
`DistCategoryTransformation`. We need it to
  create Spark DataFrames during `apply()`.
* The `apply_transformation`function of `DistFeatureTransformer` now
returns a tuple, `(processed_df: DataFrame, json_representation: dict)`.
The JSON representation is retrieved using the
`get_json_representation()` function of
each distributed transformation implementation. Currently only the
categorical transformation will return a non-empty dict.
* We add a `transformation_representations` dict as a member of
`DistHeterogeneousGraphLoader`. These dict is used to gather the
representations of each feature as we iterate through them. The
structure is:

```python
{
  "node_features": {
    "node_type1": {
      "feature_name1": {
         # feature representation goes here
      }
    },
    "node_type2": {...}
  },
  "edges_features": {...}
}
```
* At the end of graph loading this dict is saved to storage under the
output prefix as `precomputed_transformations.json`. This file will be
used to re-construct the feature transformations in an upcoming PR.

Particularly for the `DistCategoryTransformation`:

* We choose to save its representation as a dict with the following
structure:

```
string_indexer_labels_array:
    tuple[tuple[str]], outer tuple has num_cols elements,
    each inner tuple has num_cats elements, each str is a category string.
    Spark uses this to represent the one-hot index for each category, its
    position in the inner tuple is the one-hot-index position for the string.
    Categories are sorted by their frequency in the data.
cols:
    list[str], with num_cols elements, each item is one column name that was used in the 
    original transformation.
per_col_label_to_one_hot_idx: 
    dict[str, dict[str, int]], with num_cols elements, each with num_categories elements, 
    a more verbose mapping from column name to dict of string to one-hot-index position.
transformation_name:
    str, will be 'DistCategoryTransformation'
``` 

The string_indexer_labels_array comes from Spark's own representation of
its StringIndexer class, encapsulated in the `labelsArray` var of
StringIndexerModel. See docs here:
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.StringIndexerModel.html#pyspark.ml.feature.StringIndexerModel.labelsArray
and
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.StringIndexer.html#pyspark.ml.feature.StringIndexer

Example representation for input data:

```
state
wa
ca
wa
# no string here, represents missing value
ny
```

```json
{
    "node_features": {
        "user": {
            "state": {
                "transformation_name": "DistCategoryTransformation",
                "string_indexer_labels_arrays": [["wa", "ca", "ny"]],
                "cols": ["state"],
                "per_col_label_to_one_hot_idx": {
                    "state": {
                        "wa": 0,
                        "ca": 1,
                        "ny": 2
                    }
                }
            }
        }
    },
    "edge_features": {}
}
```

To reconstruct the transformation on a new DF we will iterate over the
`cols`, and for each, create one-hot vectors according the position of
each string in the corresponding `labels_array`.

E.g. given a labels array `["string2", "string1", "string3"]`, when in
the input data we encounter`string2`, it will be transformed to `[1, 0,
0]`, since its position in the labels array is position 0. `string1`
will have the representation `[0, 1, 0]` etc.

All the code changes in the apply function of
`DistCategoryTransformation` are meant to build up this representation.

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: jalencato <[email protected]>
  • Loading branch information
thvasilo and jalencato authored Jun 10, 2024
1 parent e1d128e commit 71838ba
Show file tree
Hide file tree
Showing 14 changed files with 685 additions and 293 deletions.
3 changes: 3 additions & 0 deletions graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ class FilesystemType(Enum):
"3.4": "3.3.4",
"3.3": "3.3.2",
}

########## Precomputed transformations ################
TRANSFORMATIONS_FILENAME = "precomputed_transformations.json"
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import logging

from pyspark.sql import DataFrame
from pyspark.sql import DataFrame, SparkSession

from graphstorm_processing.config.feature_config_base import FeatureConfig
from .dist_transformations import (
Expand All @@ -37,13 +37,19 @@ class DistFeatureTransformer(object):
which can then be be applied through a call to apply_transformation.
"""

def __init__(self, feature_config: FeatureConfig):
def __init__(
self, feature_config: FeatureConfig, spark: SparkSession, json_representation: dict
):
feat_type = feature_config.feat_type
feat_name = feature_config.feat_name
args_dict = feature_config.transformation_kwargs
self.transformation: DistributedTransformation
# TODO: We will use this to re-apply transformations
self.json_representation = json_representation

default_kwargs = {"cols": feature_config.cols}
default_kwargs = {
"cols": feature_config.cols,
}
logging.info("Feature name: %s", feat_name)
logging.info("Transformation type: %s", feat_type)

Expand All @@ -56,7 +62,9 @@ def __init__(self, feature_config: FeatureConfig):
elif feat_type == "bucket-numerical":
self.transformation = DistBucketNumericalTransformation(**default_kwargs, **args_dict)
elif feat_type == "categorical":
self.transformation = DistCategoryTransformation(**default_kwargs, **args_dict)
self.transformation = DistCategoryTransformation(
**default_kwargs, **args_dict, spark=spark
)
elif feat_type == "multi-categorical":
self.transformation = DistMultiCategoryTransformation(**default_kwargs, **args_dict)
elif feat_type == "huggingface":
Expand All @@ -66,14 +74,24 @@ def __init__(self, feature_config: FeatureConfig):
f"Feature {feat_name} has type: {feat_type} that is not supported"
)

def apply_transformation(self, input_df: DataFrame) -> DataFrame:
def apply_transformation(self, input_df: DataFrame) -> tuple[DataFrame, dict]:
"""
Given an input dataframe, select only the relevant columns
Given an input DataFrame, select only the relevant columns
and apply the expected transformation to them.
Returns
-------
tuple[DataFrame, dict]
A tuple with two items, the first is the transformed input DataFrame,
the second is a JSON representation of the transformation. This will
allow us to apply the same transformation to new data.
"""
input_df = input_df.select(self.transformation.cols) # type: ignore

return self.transformation.apply(input_df)
return (
self.transformation.apply(input_df),
self.transformation.get_json_representation(),
)

def get_transformation_name(self) -> str:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,44 @@
"""

from abc import ABC, abstractmethod
from typing import Sequence
from typing import Optional, Sequence

from pyspark.sql import DataFrame
from pyspark.sql import DataFrame, SparkSession


class DistributedTransformation(ABC):
"""
Base class for all distributed transformations.
"""

def __init__(self, cols: Sequence[str]) -> None:
def __init__(
self,
cols: Sequence[str],
spark: Optional[SparkSession] = None,
json_representation: Optional[dict] = None,
) -> None:
self.cols = cols
self.spark = spark
self.json_representation = json_representation

@abstractmethod
def apply(self, input_df: DataFrame) -> DataFrame:
"""
Applies the transformation to the input DataFrame.
The returned dataframe will only contain the columns specified during initialization.
Applies the transformation to the input DataFrame, and returns the modified
DataFrame.
The returned DataFrame will only contain the columns specified during initialization.
"""

def get_json_representation(self) -> dict:
"""Get a JSON representation of the transformation."""
# TODO: Should we try to guarantee apply() has ran before this?
if self.json_representation:
return self.json_representation
else:
return {}

@staticmethod
@abstractmethod
def get_transformation_name() -> str:
"""
Get the name of the transformation
"""
"""Get the name of the transformation."""
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
limitations under the License.
"""

from typing import Dict, List, Optional, Sequence
from typing import List, Optional, Sequence

import numpy as np
import pandas as pd

from pyspark.sql import DataFrame, functions as F
from pyspark.sql import DataFrame, functions as F, SparkSession
from pyspark.sql.functions import when
from pyspark.sql.types import ArrayType, FloatType, StringType
from pyspark.ml.feature import StringIndexer, OneHotEncoder
Expand All @@ -40,18 +41,19 @@ class DistCategoryTransformation(DistributedTransformation):
Transforms categorical features into a vector of one-hot-encoded values.
"""

def __init__(self, cols: List[str]) -> None:
super().__init__(cols)
def __init__(self, cols: list[str], spark: SparkSession) -> None:
super().__init__(cols, spark)

@staticmethod
def get_transformation_name() -> str:
return "DistCategoryTransformation"

def apply(self, input_df: DataFrame) -> DataFrame:
processed_col_names = []
for col in self.cols:
processed_col_names.append(col + "_processed")
distinct_category_counts = input_df.groupBy(col).count() # type: DataFrame
top_categories_per_col: dict[str, list] = {}
for current_col in self.cols:
processed_col_names.append(current_col + "_processed")
distinct_category_counts = input_df.groupBy(current_col).count() # type: DataFrame
num_distinct_categories = distinct_category_counts.count()

# Conditionally replace rare categories with single placeholder
Expand All @@ -60,17 +62,23 @@ def apply(self, input_df: DataFrame) -> DataFrame:
MAX_CATEGORIES_PER_FEATURE - 1
)
top_categories_set = {row[0] for row in top_categories}
top_categories_per_col[current_col] = list(top_categories_set)
# TODO: Ideally we don't want to use withColumn in a loop
input_df = input_df.withColumn(
col,
when(input_df[col].isin(top_categories_set), input_df[col]).otherwise(
RARE_CATEGORY
),
current_col,
when(
input_df[current_col].isin(top_categories_set), input_df[current_col]
).otherwise(RARE_CATEGORY),
)
else:
top_categories_per_col[current_col] = [
x[current_col] for x in distinct_category_counts.select(current_col).collect()
]

# Replace empty string cols with None
input_df = input_df.withColumn(
col, when(input_df[col] == "", None).otherwise(input_df[col])
current_col,
when(input_df[current_col] == "", None).otherwise(input_df[current_col]),
)

# We first convert the strings to float indexes
Expand Down Expand Up @@ -105,8 +113,80 @@ def apply(self, input_df: DataFrame) -> DataFrame:
]
)

# Structure: {column_name: {category_string: index_value, ...}. ...}
per_col_label_to_one_hot_idx: dict[str, dict[str, int]] = {}

# To get the transformed values for each value in each col
# we need to create a DataFrame with the top categories for the current
# col, then fill in the rest of the values with placeholders
# and pass the generated DF through the one-hot encoder
for current_col, processed_col in zip(self.cols, processed_col_names):
other_cols = [x for x in self.cols if x != current_col]
top_str_categories_list = top_categories_per_col[current_col]
# Spark doesn't include missing/unknown values in the vector
# representation, just uses the all-zeroes vector for them,
# so we remove instances of None from the list of strings to model
if None in top_str_categories_list:
top_str_categories_list.remove(None)

# Each col might have different number of top categories, we need one DF per col
num_current_col_cats = len(top_str_categories_list)
# We don't care about values for the other cols in this iteration,
# just fill with empty string
placeholder_vals = [""] * num_current_col_cats
placeholder_cols = [placeholder_vals for _ in range(len(self.cols) - 1)]
current_col_unique_vals = [list(top_str_categories_list)]
# We need to create a DF where all cols have num_rows == num_current_col_cats
# and the current col needs to be the first col in the DF.
vals_dict = dict(
zip([current_col] + other_cols, current_col_unique_vals + placeholder_cols)
)

# One hot encoder expects a DF with all cols that were used to train it
# so we use the top-MAX_CATEGORIES_PER_FEATURE values for the current col,
# and the placeholders for the rest
top_str_categories_df = self.spark.createDataFrame(pd.DataFrame(vals_dict))
top_indexed_categories_df = str_indexer_model.transform(top_str_categories_df)

# For the current col, get the one-hot index for each of its category strings
# by passing the top-k values DF through the one-hot encoder model
per_col_label_to_one_hot_idx[current_col] = {
x[current_col]: int(x[processed_col])
for x in one_hot_encoder_model.transform(top_indexed_categories_df).collect()
}

# see get_json_representation() docstring for structure
self.json_representation = {
"string_indexer_labels_array": str_indexer_model.labelsArray,
"cols": self.cols,
"per_col_label_to_one_hot_idx": per_col_label_to_one_hot_idx,
"transformation_name": self.get_transformation_name(),
}

return dense_vector_features

def get_json_representation(self) -> dict:
"""Representation of the single-category transformation for one or more columns.
Returns
-------
dict
Structure:
string_indexer_labels_array:
tuple[tuple[str]], outer tuple has num_cols elements,
each inner tuple has num_cats elements, each str is a category string.
Spark uses this to represent the one-hot index for each category, its
position in the inner tuple is the one-hot-index position for the string.
Categories are sorted by their frequency in the data.
cols:
list[str], with num_cols elements
per_col_label_to_one_hot_idx:
dict[str, dict[str, int]], with num_cols elements, each with num_categories elements
transformation_name:
str, will be 'DistCategoryTransformation'
"""
return self.json_representation


class DistMultiCategoryTransformation(DistributedTransformation):
"""
Expand Down Expand Up @@ -135,7 +215,7 @@ def __init__(self, cols: Sequence[str], separator: str) -> None:
if self.separator in SPECIAL_CHARACTERS:
self.separator = f"\\{self.separator}"

self.value_map = {} # type: Dict[str, int]
self.value_map: dict[str, int] = {}

@staticmethod
def get_transformation_name() -> str:
Expand Down
Loading

0 comments on commit 71838ba

Please sign in to comment.