Skip to content

Commit

Permalink
[GSProcessing] Add option to truncate vectors with no-op transformation.
Browse files Browse the repository at this point in the history
  • Loading branch information
thvasilo committed Jul 13, 2024
1 parent 766d007 commit 7d527ac
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The GSProcessing input data configuration has two top-level objects:
.. code-block:: json
{
"version": "gsprocessing-v1.0",
"version": "gsprocessing-v0.3.1",
"graph": {}
}
Expand Down Expand Up @@ -380,6 +380,12 @@ arguments.
split the values in the column and create a vector column
output. Example: for a separator ``'|'`` the CSV value
``1|2|3`` would be transformed to a vector, ``[1, 2, 3]``.
- ``truncate_dim`` (Integer, optional): Relevant for vector inputs.
Allows you to truncate the input vector to the first ``truncate_dim``
values, which can be useful when your inputs are `Matryoshka representation
learning embeddings <https://arxiv.org/abs/2205.13147>`_.
- ``out_dtype`` (String, Optional): Specify the data type of the transformed feature.
Currently we only support ``float32`` and ``float64`` .
- ``numerical``

- Transforms a numerical column using a missing data imputer and an
Expand All @@ -400,7 +406,7 @@ arguments.
- ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values,
converts the ranks to the -1/1 range, and applies the `inverse of the error function <https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.erfinv.html>`_ to make the values conform
to a Gaussian distribution shape. This transformation only supports a single column as input.
- ``out_dtype`` (Optional): Specify the data type of the transformed feature.
- ``out_dtype`` (String, Optional): Specify the data type of the transformed feature.
Currently we only support ``float32`` and ``float64`` .
- ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator
to avoid infinite values during normalization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""

import abc
from typing import Any, Mapping, Sequence
from typing import Any, Mapping, Optional, Sequence

from graphstorm_processing.constants import VALID_OUTDTYPE, TYPE_FLOAT32

Expand Down Expand Up @@ -89,18 +89,24 @@ class NoopFeatureConfig(FeatureConfig):
Supported kwargs
----------------
out_dtype: str
Output feature dtype. Currently, we support ``float32`` and ``float64``.
Default is ``float32``
separator: str
When provided will treat the input as strings, split each value in the string using
the separator, and convert the resulting list of floats into a float-vector feature.
truncate_dim: int
When provided, will truncate the output float-vector feature to the specified dimension.
This is useful when the feature is a multi-dimensional vector and we only need
a subset of the dimensions, e.g. for Matryoshka Representation Learning embeddings.
"""

def __init__(self, config: Mapping):
super().__init__(config)

self.value_separator = None
self.out_dtype = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32)
if self._transformation_kwargs:
self.value_separator = self._transformation_kwargs.get("separator")
self.out_dtype: str = self._transformation_kwargs.get("out_dtype", TYPE_FLOAT32)
self.value_separator: Optional[str] = self._transformation_kwargs.get("separator", None)
self.truncate_dim: Optional[int] = self._transformation_kwargs.get("truncate_dim", None)

self._sanity_check()

Expand All @@ -111,3 +117,6 @@ def _sanity_check(self) -> None:
assert (
self.out_dtype in VALID_OUTDTYPE
), f"Unsupported output dtype, expected one of {VALID_OUTDTYPE}, got {self.out_dtype}"
assert self.truncate_dim is None or isinstance(
self.truncate_dim, int
), f"truncate_dim should be an int or None, got {type(self.truncate_dim)}"
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
limitations under the License.
"""

from typing import List, Optional
import warnings
from typing import Optional


from pyspark.sql import DataFrame
from pyspark.sql import functions as F
Expand All @@ -35,16 +37,24 @@ class NoopTransformation(DistributedTransformation):
Parameters
----------
cols : List[str]
cols : list[str]
The list of columns to parse as floats or lists of float
separator : Optional[str], optional
Optional separator to use to split the string, by default None
out_dtype: str
The output feature dtype
truncate_dim: int
When provided, will truncate the output float-vector feature to the specified dimension.
This is useful when the feature is a multi-dimensional vector and we only need
a subset of the dimensions, e.g. for Matryoshka Representation Learning embeddings.
"""

def __init__(
self, cols: List[str], out_dtype: str = TYPE_FLOAT32, separator: Optional[str] = None
self,
cols: list[str],
out_dtype: str = TYPE_FLOAT32,
separator: Optional[str] = None,
truncate_dim: Optional[int] = None,
) -> None:
super().__init__(cols)
# TODO: Support multiple cols?
Expand All @@ -55,6 +65,18 @@ def __init__(
# escape special chars to be used as separators
if self.separator in SPECIAL_CHARACTERS:
self.separator = f"\\{self.separator}"
self.truncate_dim = truncate_dim

def _truncate_vector_df(self, input_df: DataFrame) -> DataFrame:
"""Truncates every vector in the input DF to the specified dimension."""
assert self.truncate_dim is not None
return input_df.select(
[
# SQL array indexes start at 1
F.slice(F.col(column), 1, self.truncate_dim).alias(column)
for column in self.cols
]
)

def apply(self, input_df: DataFrame) -> DataFrame:
"""
Expand All @@ -72,13 +94,18 @@ def apply(self, input_df: DataFrame) -> DataFrame:
f"Unsupported array type {col_datatype.elementType} "
f"for column {self.cols[0]}"
)
return input_df
if self.truncate_dim:
return self._truncate_vector_df(input_df)
else:
return input_df
elif isinstance(col_datatype, NumericType):
if self.truncate_dim is not None:
warnings.warn(f"Trying use {self.truncate_dim=} on a DataFrame of scalars!")
return input_df

# Otherwise we'll try to convert the values from list of strings to list of Doubles

def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[float]]:
def str_list_to_float_vec(string_list: Optional[list[str]]) -> Optional[list[float]]:
if string_list:
return [float(x) for x in string_list]
return None
Expand All @@ -95,15 +122,17 @@ def str_list_to_float_vec(string_list: Optional[List[str]]) -> Optional[List[flo
for column in self.cols
]
)
return input_df
else:
return input_df.select(
input_df = input_df.select(
[
F.col(column).cast(DTYPE_MAP[self.out_dtype]).alias(column)
for column in self.cols
]
)

if self.truncate_dim:
return self._truncate_vector_df(input_df)

@staticmethod
def get_transformation_name() -> str:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@ def __init__(
self.precomputed_transformations = {}

if "version" in dataset_config_dict:
config_version = dataset_config_dict["version"]
if config_version == "gsprocessing-v1.0":
config_version: str = dataset_config_dict["version"]
if config_version.startswith("gsprocessing"):
logging.info("Parsing config file as GSProcessing config")
self.gsp_config_dict = dataset_config_dict["graph"]
elif config_version == "gconstruct-v1.0":
elif config_version.startswith("gconstruct"):
logging.info("Parsing config file as GConstruct config")
converter = GConstructConfigConverter()
self.gsp_config_dict = converter.convert_to_gsprocessing(dataset_config_dict)[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@
}
}
},
{
"column": "multi",
"name": "no-op-truncated",
"transformation": {
"name": "no-op",
"kwargs": {
"separator": "|",
"truncate_dim": 1
}
}
},
{
"column": "occupation",
"transformation": {
Expand Down
2 changes: 2 additions & 0 deletions graphstorm-processing/tests/test_dist_heterogenous_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"input_ids": 16,
"token_type_ids": 16,
"multi": 2,
"no-op-truncated": 1,
"state": 3,
}
},
Expand Down Expand Up @@ -296,6 +297,7 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade
"test_mask",
"age",
"multi",
"no-op-truncated",
"state",
"input_ids",
"attention_mask",
Expand Down
24 changes: 24 additions & 0 deletions graphstorm-processing/tests/test_dist_noop_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ def test_noop_floatvector_transformation(spark: SparkSession, check_df_schema):
assert_array_equal(expected_values, transformed_values)


def test_noop_floatvector_truncation(spark: SparkSession, check_df_schema):
"""No-op transformation for numerical vector columns with truncation"""
data = [([[10, 20]]), ([[30, 40]]), ([[50, 60]]), ([[70, 80]]), ([[90, 100]])]

col_name = "feat"
schema = StructType([StructField("feat", ArrayType(IntegerType(), True), True)])
vec_df = spark.createDataFrame(data, schema=schema)

noop_transfomer = NoopTransformation(
[col_name],
truncate_dim=1,
)

transformed_df = noop_transfomer.apply(vec_df)

expected_values = [[10], [30], [50], [70], [90]]

check_df_schema(transformed_df)

transformed_values = [row[col_name] for row in transformed_df.collect()]

assert_array_equal(expected_values, transformed_values)


def test_noop_largegint_transformation(spark: SparkSession, check_df_schema):
"""No-op transformation for long numerical columns"""
large_int = 4 * 10**18
Expand Down

0 comments on commit 7d527ac

Please sign in to comment.