From 096488628cdc6a88d87710925627c0ca34954b9a Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 10 Jan 2024 19:37:14 +0000 Subject: [PATCH 01/38] add gconstruct converter --- .../config_conversion/gconstruct_converter.py | 7 +++++++ graphstorm-processing/tests/test_converter.py | 13 ++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index 7761acdcf8..d03cb380f4 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -134,6 +134,13 @@ def _convert_feature(feats: list[dict]) -> list[dict]: else: gsp_transformation_dict["name"] = "categorical" gsp_transformation_dict["kwargs"] = {} + elif gconstruct_transform_dict["name"] == "tokenize_hf": + gsp_transformation_dict["name"] = "bert" + gsp_transformation_dict["kwargs"] = { + "normalizer": "tokenize", + "bert_model": gconstruct_transform_dict["bert_model"], + "max_seq_length": gconstruct_transform_dict["max_seq_length"] + } # TODO: Add support for other common transformations here else: raise ValueError( diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index 7afb0df4b2..8dc883f516 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -56,7 +56,7 @@ def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node node_dict["nodes"][0]["features"] = [ { "feature_col": ["paper_title"], - "transform": {"name": "tokenize_hf"}, + "transform": {"name": "bert_hf"}, } ] @@ -235,6 +235,10 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "feature_col": ["num_citations"], "transform": {"name": "to_categorical", "separator": ","}, }, + { + "feature_col": ["citation_name"], + "transform": {"name": "tokenize_hf", "bert_model": "bert", "max_seq_length": 64}, + }, ], "labels": [ {"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]} @@ -321,6 +325,13 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "kwargs": {"separator": ","}, }, }, + { + "column": "citation_name", + "transformation": { + "name": "bert", + "kwargs": {"normalizer": "tokenize", "bert_model": "bert", "max_seq_length": 64}, + }, + }, ] assert nodes_output["labels"] == [ { From 0a672170a4cae664c309a5e1d4f25867ad52343b Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 10 Jan 2024 20:34:29 +0000 Subject: [PATCH 02/38] first commit about code structure on tokenize feature transformation --- .../config/bert_configs.py | 45 ++++++++++ .../config/config_parser.py | 3 + .../graphstorm_processing/constants.py | 2 + .../dist_feature_transformer.py | 3 + .../dist_transformations/__init__.py | 1 + .../dist_bert_transformation.py | 85 +++++++++++++++++++ .../graph_loaders/schema_utils.py | 2 + 7 files changed, 141 insertions(+) create mode 100644 graphstorm-processing/graphstorm_processing/config/bert_configs.py create mode 100644 graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py diff --git a/graphstorm-processing/graphstorm_processing/config/bert_configs.py b/graphstorm-processing/graphstorm_processing/config/bert_configs.py new file mode 100644 index 0000000000..3365338f9e --- /dev/null +++ b/graphstorm-processing/graphstorm_processing/config/bert_configs.py @@ -0,0 +1,45 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from typing import Mapping +import numbers + +from graphstorm_processing.constants import VALID_BERT_MODEL +from .feature_config_base import FeatureConfig + + +class BertConfig(FeatureConfig): + """Feature configuration for single-column numerical features. + + Supported kwargs + ---------------- + + """ + + def __init__(self, config: Mapping): + super().__init__(config) + self.bert_model = self._transformation_kwargs.get("bert_model", "none") + self.max_seq_length = self._transformation_kwargs.get("max_seq_length", "none") + + self._sanity_check() + + def _sanity_check(self) -> None: + super()._sanity_check() + assert ( + self.bert_model in VALID_BERT_MODEL + ), f"Unknown imputer requested, expected one of {VALID_BERT_MODEL}, got {self.bert_model}" + assert isinstance(self.max_seq_length, int) and self.max_seq_length > 0, \ + f"Expect bucket_cnt {self.bucket_cnt} be an integer and larger than zero." + diff --git a/graphstorm-processing/graphstorm_processing/config/config_parser.py b/graphstorm-processing/graphstorm_processing/config/config_parser.py index 3e32dbe91a..d5958998b7 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_parser.py +++ b/graphstorm-processing/graphstorm_processing/config/config_parser.py @@ -27,6 +27,7 @@ NumericalFeatureConfig, ) from .categorical_configs import MultiCategoricalFeatureConfig +from .bert_configs import BertConfig from .data_config_base import DataStorageConfig @@ -67,6 +68,8 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig: return FeatureConfig(feature_dict) elif transformation_name == "multi-categorical": return MultiCategoricalFeatureConfig(feature_dict) + elif transformation_name == "bert": + return BertConfig(feature_dict) else: raise RuntimeError(f"Unknown transformation name: '{transformation_name}'") diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index 6ac05eb2ef..5827f93dbf 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -43,3 +43,5 @@ ################# Numerical transformations ################ VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"] VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"] +VALID_BERT_MODEL = ["bert-base-uncased", "bert", "roberta", "albert", "camembert", "ernie", "ibert", + "luke", "mega", "mpnet", "nezha", "qdqbert","roc_bert"] diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py index e325e67243..2315357978 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py @@ -26,6 +26,7 @@ DistBucketNumericalTransformation, DistCategoryTransformation, DistMultiCategoryTransformation, + DistBertTransformation, ) @@ -57,6 +58,8 @@ def __init__(self, feature_config: FeatureConfig): self.transformation = DistCategoryTransformation(**default_kwargs, **args_dict) elif feat_type == "multi-categorical": self.transformation = DistMultiCategoryTransformation(**default_kwargs, **args_dict) + elif feat_type == "bert": + self.transformation = DistBertTransformation(**default_kwargs, **args_dict) else: raise NotImplementedError( f"Feature {feat_name} has type: {feat_type} that is not supported" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py index 9416a45a31..b6e05c307e 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py @@ -13,3 +13,4 @@ DistNumericalTransformation, ) from .dist_bucket_numerical_transformation import DistBucketNumericalTransformation +from .dist_bert_transformation import DistBertTransformation diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py new file mode 100644 index 0000000000..3b607d6778 --- /dev/null +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -0,0 +1,85 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import logging +from typing import Optional, Sequence +import uuid + +from pyspark.sql import DataFrame +from pyspark.sql import functions as F +from pyspark.sql.types import ArrayType, FloatType +from pyspark.ml.feature import MinMaxScaler, Imputer, VectorAssembler, ElementwiseProduct +from pyspark.ml.linalg import DenseVector +from pyspark.ml.stat import Summarizer +from pyspark.ml import Pipeline +from pyspark.ml.functions import array_to_vector, vector_to_array + +import numpy as np +import pandas as pd + +from .base_dist_transformation import DistributedTransformation +from ..spark_utils import rename_multiple_cols + + +def apply_norm( + cols: Sequence[str], bert_norm: str, input_df: DataFrame +) -> DataFrame: + """Applies a single normalizer to the imputed dataframe, individually to each of the columns + provided in the cols argument. + + Parameters + ---------- + cols : Sequence[str] + List of column names to apply normalization to. + bert_norm : str + The type of normalization to use. Valid values is "tokenize" + input_df : DataFrame + The input DataFrame to apply normalization to. + """ + + if bert_norm == "tokenize": + scaled_df = input_df + + return scaled_df + + +class DistBertTransformation(DistributedTransformation): + """Transformation to apply various forms of bert normalization to a text input. + + Parameters + ---------- + cols : Sequence[str] + List of column names to apply normalization to. + bert_norm : str + The type of normalization to use. Valid values is "tokenize" + """ + + def __init__( + self, cols: Sequence[str], normalizer: str, bert_model: str, max_seq_length: int + ) -> None: + super().__init__(cols) + self.cols = cols + self.bert_norm = normalizer + self.bert_model = bert_model + self.max_seq_length = max_seq_length + + def apply(self, input_df: DataFrame) -> DataFrame: + scaled_df = apply_norm(self.cols, self.bert_norm, input_df) + + return scaled_df + + @staticmethod + def get_transformation_name() -> str: + return "DistBertTransformation" \ No newline at end of file diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py index c124760da6..8fdf80f255 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py @@ -101,6 +101,8 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]: return StringType if feature_type in ["numerical", "bucket-numerical", "none"]: return FloatType + if feature_type in ["bert"]: + return StringType else: raise NotImplementedError(f"Unknown feature type: {feature_type}") From 7c204366f7ebe2f21d2adafc197a029229674a20 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 11 Jan 2024 21:54:39 +0000 Subject: [PATCH 03/38] add first version with udf implementation --- .../config/bert_configs.py | 2 +- .../dist_bert_transformation.py | 35 ++++++++++++++++--- .../graph_loaders/schema_utils.py | 3 +- .../gconstruct-config.json | 7 +++- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/bert_configs.py b/graphstorm-processing/graphstorm_processing/config/bert_configs.py index 3365338f9e..72c39baa93 100644 --- a/graphstorm-processing/graphstorm_processing/config/bert_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/bert_configs.py @@ -41,5 +41,5 @@ def _sanity_check(self) -> None: self.bert_model in VALID_BERT_MODEL ), f"Unknown imputer requested, expected one of {VALID_BERT_MODEL}, got {self.bert_model}" assert isinstance(self.max_seq_length, int) and self.max_seq_length > 0, \ - f"Expect bucket_cnt {self.bucket_cnt} be an integer and larger than zero." + f"Expect max_seq_length {self.max_seq_length} be an integer and larger than zero." diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index 3b607d6778..a47c640ff3 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -19,22 +19,24 @@ from pyspark.sql import DataFrame from pyspark.sql import functions as F -from pyspark.sql.types import ArrayType, FloatType -from pyspark.ml.feature import MinMaxScaler, Imputer, VectorAssembler, ElementwiseProduct -from pyspark.ml.linalg import DenseVector +from pyspark.sql.types import MapType, ArrayType, IntegerType, StringType from pyspark.ml.stat import Summarizer from pyspark.ml import Pipeline from pyspark.ml.functions import array_to_vector, vector_to_array +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.functions import udf import numpy as np import pandas as pd +import torch as th +from transformers import AutoTokenizer from .base_dist_transformation import DistributedTransformation from ..spark_utils import rename_multiple_cols def apply_norm( - cols: Sequence[str], bert_norm: str, input_df: DataFrame + cols: Sequence[str], bert_norm: str, max_seq_length: int, input_df: DataFrame ) -> DataFrame: """Applies a single normalizer to the imputed dataframe, individually to each of the columns provided in the cols argument. @@ -52,6 +54,28 @@ def apply_norm( if bert_norm == "tokenize": scaled_df = input_df + # Initialize the tokenizer + tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") + + def tokenize(text): + # Check if text is a string + if not isinstance(text, str): + raise ValueError("The input of the tokenizer has to be a string.") + + # Tokenize the text + t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='pt') + result = { + 'input_ids': t['input_ids'][0].tolist(), # Convert tensor to list + 'attention_mask': t['attention_mask'][0].to(th.int8).tolist(), + 'token_type_ids': t.get('token_type_ids', th.zeros_like(t['input_ids'])).to(th.int8)[0].tolist() + } + return result + + # Define the UDF with the appropriate return type + tokenize_udf = udf(tokenize, MapType(StringType(), ArrayType(IntegerType()))) + + # Apply the UDF to the DataFrame + scaled_df = input_df.withColumn(cols[0], tokenize_udf(input_df[cols[0]])) return scaled_df @@ -71,12 +95,13 @@ def __init__( ) -> None: super().__init__(cols) self.cols = cols + assert len(self.cols) == 1, "Bert transformation only supports single column" self.bert_norm = normalizer self.bert_model = bert_model self.max_seq_length = max_seq_length def apply(self, input_df: DataFrame) -> DataFrame: - scaled_df = apply_norm(self.cols, self.bert_norm, input_df) + scaled_df = apply_norm(self.cols, self.bert_norm, self.max_seq_length, input_df) return scaled_df diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py index 8fdf80f255..facc9ba9b2 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py @@ -97,12 +97,11 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]: "multi-numerical", "categorical", "multi-categorical", + "bert" ] or feature_type.startswith("text"): return StringType if feature_type in ["numerical", "bucket-numerical", "none"]: return FloatType - if feature_type in ["bert"]: - return StringType else: raise NotImplementedError(f"Unknown feature type: {feature_type}") diff --git a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gconstruct-config.json b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gconstruct-config.json index baa103839c..76ecdfbb1d 100644 --- a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gconstruct-config.json +++ b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gconstruct-config.json @@ -58,7 +58,12 @@ ], "features": [ { - "feature_col": ["age"] + "feature_col": ["occupation"], + "transform": { + "name": "tokenize_hf", + "bert_model": "bert-base-uncased", + "max_seq_length": 16 + } } ] } From 626565dc8ab9cf33e663799784f49b62fe16936d Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 17 Jan 2024 21:50:39 +0000 Subject: [PATCH 04/38] remove torch related --- .../dist_transformations/dist_bert_transformation.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index a47c640ff3..c45e359ff3 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -28,7 +28,6 @@ import numpy as np import pandas as pd -import torch as th from transformers import AutoTokenizer from .base_dist_transformation import DistributedTransformation @@ -47,13 +46,13 @@ def apply_norm( List of column names to apply normalization to. bert_norm : str The type of normalization to use. Valid values is "tokenize" + max_seq_length : int + The maximal length of the tokenization results. input_df : DataFrame The input DataFrame to apply normalization to. """ if bert_norm == "tokenize": - scaled_df = input_df - # Initialize the tokenizer tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") @@ -63,7 +62,7 @@ def tokenize(text): raise ValueError("The input of the tokenizer has to be a string.") # Tokenize the text - t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='pt') + t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') result = { 'input_ids': t['input_ids'][0].tolist(), # Convert tensor to list 'attention_mask': t['attention_mask'][0].to(th.int8).tolist(), @@ -76,6 +75,7 @@ def tokenize(text): # Apply the UDF to the DataFrame scaled_df = input_df.withColumn(cols[0], tokenize_udf(input_df[cols[0]])) + return scaled_df From cfebe1de7adad65bfb1e2a98ea5d67bdf1bcddd2 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 17 Jan 2024 22:27:49 +0000 Subject: [PATCH 05/38] remove torch --- .../dist_transformations/dist_bert_transformation.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index c45e359ff3..fdc3a03c0f 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -63,10 +63,11 @@ def tokenize(text): # Tokenize the text t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') + token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8)) result = { 'input_ids': t['input_ids'][0].tolist(), # Convert tensor to list - 'attention_mask': t['attention_mask'][0].to(th.int8).tolist(), - 'token_type_ids': t.get('token_type_ids', th.zeros_like(t['input_ids'])).to(th.int8)[0].tolist() + 'attention_mask': t['attention_mask'][0].astype(np.int8).tolist(), + 'token_type_ids': token_type_ids[0].astype(np.int8).tolist() } return result From 32da0c8a6771a27064e59d4a505a84348afde99c Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 18 Jan 2024 00:03:23 +0000 Subject: [PATCH 06/38] add doc --- .../graphstorm_processing/config/bert_configs.py | 11 ++++++----- .../graphstorm_processing/constants.py | 2 -- .../dist_transformations/dist_bert_transformation.py | 9 ++++++++- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/bert_configs.py b/graphstorm-processing/graphstorm_processing/config/bert_configs.py index 72c39baa93..9b2b1c0d44 100644 --- a/graphstorm-processing/graphstorm_processing/config/bert_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/bert_configs.py @@ -16,7 +16,6 @@ from typing import Mapping import numbers -from graphstorm_processing.constants import VALID_BERT_MODEL from .feature_config_base import FeatureConfig @@ -25,7 +24,10 @@ class BertConfig(FeatureConfig): Supported kwargs ---------------- - + bert_model: str, required + The name of the lm model. + max_seq_length: int, required + The maximal length of the tokenization results. """ def __init__(self, config: Mapping): @@ -37,9 +39,8 @@ def __init__(self, config: Mapping): def _sanity_check(self) -> None: super()._sanity_check() - assert ( - self.bert_model in VALID_BERT_MODEL - ), f"Unknown imputer requested, expected one of {VALID_BERT_MODEL}, got {self.bert_model}" + assert isinstance(self.bert_model, str),\ + f"Expect bert_model to be a string, but got {self.bert_model}" assert isinstance(self.max_seq_length, int) and self.max_seq_length > 0, \ f"Expect max_seq_length {self.max_seq_length} be an integer and larger than zero." diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index 5827f93dbf..6ac05eb2ef 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -43,5 +43,3 @@ ################# Numerical transformations ################ VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"] VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"] -VALID_BERT_MODEL = ["bert-base-uncased", "bert", "roberta", "albert", "camembert", "ernie", "ibert", - "luke", "mega", "mpnet", "nezha", "qdqbert","roc_bert"] diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index fdc3a03c0f..e778658408 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -45,7 +45,7 @@ def apply_norm( cols : Sequence[str] List of column names to apply normalization to. bert_norm : str - The type of normalization to use. Valid values is "tokenize" + The type of normalization to use. Valid value is "tokenize" max_seq_length : int The maximal length of the tokenization results. input_df : DataFrame @@ -62,6 +62,9 @@ def tokenize(text): raise ValueError("The input of the tokenizer has to be a string.") # Tokenize the text + # Instead of doing the similar thing as what we do in the GConstruct, it is suggested + # to use numpy here to refactor the data type. So it is not necessary to introduce the + # torch dependency here t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8)) result = { @@ -89,6 +92,10 @@ class DistBertTransformation(DistributedTransformation): List of column names to apply normalization to. bert_norm : str The type of normalization to use. Valid values is "tokenize" + bert_model: str + The name of the lm model. + max_seq_length: int + The maximal length of the tokenization results. """ def __init__( From 18d61e4e513dd6b08c3eb8b64d02c6a6e81f40c6 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 23 Jan 2024 21:52:00 +0000 Subject: [PATCH 07/38] add --- .../dist_bert_transformation.py | 55 +++++++++++++------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index e778658408..27990b68f8 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -17,9 +17,9 @@ from typing import Optional, Sequence import uuid -from pyspark.sql import DataFrame +from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as F -from pyspark.sql.types import MapType, ArrayType, IntegerType, StringType +from pyspark.sql.types import MapType, ArrayType, IntegerType, StringType, StructType, StructField from pyspark.ml.stat import Summarizer from pyspark.ml import Pipeline from pyspark.ml.functions import array_to_vector, vector_to_array @@ -45,26 +45,32 @@ def apply_norm( cols : Sequence[str] List of column names to apply normalization to. bert_norm : str - The type of normalization to use. Valid value is "tokenize" - max_seq_length : int - The maximal length of the tokenization results. + The type of normalization to use. Valid values is "tokenize" input_df : DataFrame The input DataFrame to apply normalization to. """ if bert_norm == "tokenize": + scaled_df = input_df + # Initialize the tokenizer tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") + # Define the schema of your return type + schema = StructType([ + StructField("input_ids", ArrayType(IntegerType())), + StructField("attention_mask", ArrayType(IntegerType())), + StructField("token_type_ids", ArrayType(IntegerType())) + ]) + + # Define UDF + @udf(returnType=schema, useArrow=True) def tokenize(text): # Check if text is a string if not isinstance(text, str): raise ValueError("The input of the tokenizer has to be a string.") # Tokenize the text - # Instead of doing the similar thing as what we do in the GConstruct, it is suggested - # to use numpy here to refactor the data type. So it is not necessary to introduce the - # torch dependency here t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8)) result = { @@ -74,12 +80,30 @@ def tokenize(text): } return result - # Define the UDF with the appropriate return type - tokenize_udf = udf(tokenize, MapType(StringType(), ArrayType(IntegerType()))) - # Apply the UDF to the DataFrame - scaled_df = input_df.withColumn(cols[0], tokenize_udf(input_df[cols[0]])) - + scaled_df = input_df.withColumn(cols[0], tokenize(input_df[cols[0]])) + + # @udf(returnType=schema, useArrow=True) + # def tokenize(text): + # # Check if text is a string + # if not isinstance(text, str): + # raise ValueError("The input of the tokenizer has to be a string.") + # + # # Tokenize the text + # t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') + # token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8)) + # result = { + # 'input_ids': t['input_ids'][0].tolist(), # Convert tensor to list + # 'attention_mask': t['attention_mask'][0].astype(np.int8).tolist(), + # 'token_type_ids': token_type_ids[0].astype(np.int8).tolist() + # } + # return result + # + # # Define the UDF with the appropriate return type + # tokenize_udf = udf(tokenize, MapType(StringType(), ArrayType(IntegerType()))) + # + # # Apply the UDF to the DataFrame + # scaled_df = input_df.withColumn(cols[0], tokenize_udf(input_df[cols[0]])) return scaled_df @@ -92,10 +116,6 @@ class DistBertTransformation(DistributedTransformation): List of column names to apply normalization to. bert_norm : str The type of normalization to use. Valid values is "tokenize" - bert_model: str - The name of the lm model. - max_seq_length: int - The maximal length of the tokenization results. """ def __init__( @@ -107,6 +127,7 @@ def __init__( self.bert_norm = normalizer self.bert_model = bert_model self.max_seq_length = max_seq_length + self.spark = spark def apply(self, input_df: DataFrame) -> DataFrame: scaled_df = apply_norm(self.cols, self.bert_norm, self.max_seq_length, input_df) From 178da9f00dc9b74a38bbc1946ee122ad86a481dc Mon Sep 17 00:00:00 2001 From: JalenCato Date: Tue, 23 Jan 2024 21:55:27 +0000 Subject: [PATCH 08/38] add --- .../dist_bert_transformation.py | 23 +------------------ 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index 27990b68f8..1883013f0c 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -82,28 +82,7 @@ def tokenize(text): # Apply the UDF to the DataFrame scaled_df = input_df.withColumn(cols[0], tokenize(input_df[cols[0]])) - - # @udf(returnType=schema, useArrow=True) - # def tokenize(text): - # # Check if text is a string - # if not isinstance(text, str): - # raise ValueError("The input of the tokenizer has to be a string.") - # - # # Tokenize the text - # t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') - # token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8)) - # result = { - # 'input_ids': t['input_ids'][0].tolist(), # Convert tensor to list - # 'attention_mask': t['attention_mask'][0].astype(np.int8).tolist(), - # 'token_type_ids': token_type_ids[0].astype(np.int8).tolist() - # } - # return result - # - # # Define the UDF with the appropriate return type - # tokenize_udf = udf(tokenize, MapType(StringType(), ArrayType(IntegerType()))) - # - # # Apply the UDF to the DataFrame - # scaled_df = input_df.withColumn(cols[0], tokenize_udf(input_df[cols[0]])) + return scaled_df From 8a0f872a9a2e2bd06dee9a996e54df70dd653c4c Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 25 Jan 2024 21:27:11 +0000 Subject: [PATCH 09/38] add fix --- .../dist_transformations/dist_bert_transformation.py | 6 +++--- .../graphstorm_processing/graph_loaders/schema_utils.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index 1883013f0c..1284f50839 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -62,9 +62,10 @@ def apply_norm( StructField("attention_mask", ArrayType(IntegerType())), StructField("token_type_ids", ArrayType(IntegerType())) ]) + schema = ArrayType(IntegerType()) # Define UDF - @udf(returnType=schema, useArrow=True) + @udf(returnType=schema) def tokenize(text): # Check if text is a string if not isinstance(text, str): @@ -78,7 +79,7 @@ def tokenize(text): 'attention_mask': t['attention_mask'][0].astype(np.int8).tolist(), 'token_type_ids': token_type_ids[0].astype(np.int8).tolist() } - return result + return result['input_ids'] # Apply the UDF to the DataFrame scaled_df = input_df.withColumn(cols[0], tokenize(input_df[cols[0]])) @@ -106,7 +107,6 @@ def __init__( self.bert_norm = normalizer self.bert_model = bert_model self.max_seq_length = max_seq_length - self.spark = spark def apply(self, input_df: DataFrame) -> DataFrame: scaled_df = apply_norm(self.cols, self.bert_norm, self.max_seq_length, input_df) diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py index 67945c2e26..d7f334efbf 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py @@ -61,7 +61,8 @@ def _parse_features_schema(features_objects: Sequence[FeatureConfig]) -> Sequenc feature_type = feature_config.feat_type for feature_col, _ in zip(feature_config.cols, feature_config.feat_name): spark_feature_type = determine_spark_feature_type(feature_type) - + if StructField(feature_col, spark_feature_type(), True) in field_list: + continue field_list.append(StructField(feature_col, spark_feature_type(), True)) return field_list From 69edfd9e48df4c3a7bf45f5313c956f909c2261d Mon Sep 17 00:00:00 2001 From: JalenCato Date: Fri, 26 Jan 2024 20:14:16 +0000 Subject: [PATCH 10/38] rename --- .../config_conversion/gconstruct_converter.py | 4 +- .../config/config_parser.py | 2 +- .../graphstorm_processing/constants.py | 4 + .../dist_feature_transformer.py | 2 +- .../dist_bert_transformation.py | 31 ++++--- .../dist_heterogeneous_loader.py | 90 +++++++++---------- .../graph_loaders/schema_utils.py | 2 +- graphstorm-processing/tests/test_converter.py | 8 +- 8 files changed, 73 insertions(+), 70 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index d03cb380f4..1940a62c02 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -135,9 +135,9 @@ def _convert_feature(feats: list[dict]) -> list[dict]: gsp_transformation_dict["name"] = "categorical" gsp_transformation_dict["kwargs"] = {} elif gconstruct_transform_dict["name"] == "tokenize_hf": - gsp_transformation_dict["name"] = "bert" + gsp_transformation_dict["name"] = "huggingface" gsp_transformation_dict["kwargs"] = { - "normalizer": "tokenize", + "normalizer": "tokenize_hf", "bert_model": gconstruct_transform_dict["bert_model"], "max_seq_length": gconstruct_transform_dict["max_seq_length"] } diff --git a/graphstorm-processing/graphstorm_processing/config/config_parser.py b/graphstorm-processing/graphstorm_processing/config/config_parser.py index d5958998b7..18cbb69877 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_parser.py +++ b/graphstorm-processing/graphstorm_processing/config/config_parser.py @@ -68,7 +68,7 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig: return FeatureConfig(feature_dict) elif transformation_name == "multi-categorical": return MultiCategoricalFeatureConfig(feature_dict) - elif transformation_name == "bert": + elif transformation_name == "huggingface": return BertConfig(feature_dict) else: raise RuntimeError(f"Unknown transformation name: '{transformation_name}'") diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index 6ac05eb2ef..7ce0967746 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -43,3 +43,7 @@ ################# Numerical transformations ################ VALID_IMPUTERS = ["none", "mean", "median", "most_frequent"] VALID_NORMALIZERS = ["none", "min-max", "standard", "rank-gauss"] + +################# Bert transformations ################ +HUGGINGFACE_TRANFORM = "huggingface" +HUGGINGFACE_TOKENIZE = "tokenize_hf" \ No newline at end of file diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py index 2315357978..04ed695004 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py @@ -58,7 +58,7 @@ def __init__(self, feature_config: FeatureConfig): self.transformation = DistCategoryTransformation(**default_kwargs, **args_dict) elif feat_type == "multi-categorical": self.transformation = DistMultiCategoryTransformation(**default_kwargs, **args_dict) - elif feat_type == "bert": + elif feat_type == "huggingface": self.transformation = DistBertTransformation(**default_kwargs, **args_dict) else: raise NotImplementedError( diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py index 1284f50839..b7b590fa43 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py @@ -32,10 +32,11 @@ from .base_dist_transformation import DistributedTransformation from ..spark_utils import rename_multiple_cols +from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE def apply_norm( - cols: Sequence[str], bert_norm: str, max_seq_length: int, input_df: DataFrame + cols: Sequence[str], bert_norm: str, bert_model: str, max_seq_length: int, input_df: DataFrame ) -> DataFrame: """Applies a single normalizer to the imputed dataframe, individually to each of the columns provided in the cols argument. @@ -46,13 +47,13 @@ def apply_norm( List of column names to apply normalization to. bert_norm : str The type of normalization to use. Valid values is "tokenize" + bert_model : str + The name of huggingface model. input_df : DataFrame The input DataFrame to apply normalization to. """ - if bert_norm == "tokenize": - scaled_df = input_df - + if bert_norm == HUGGINGFACE_TOKENIZE: # Initialize the tokenizer tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") @@ -62,7 +63,6 @@ def apply_norm( StructField("attention_mask", ArrayType(IntegerType())), StructField("token_type_ids", ArrayType(IntegerType())) ]) - schema = ArrayType(IntegerType()) # Define UDF @udf(returnType=schema) @@ -74,16 +74,21 @@ def tokenize(text): # Tokenize the text t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8)) - result = { - 'input_ids': t['input_ids'][0].tolist(), # Convert tensor to list - 'attention_mask': t['attention_mask'][0].astype(np.int8).tolist(), - 'token_type_ids': token_type_ids[0].astype(np.int8).tolist() - } - return result['input_ids'] + result = ( + t['input_ids'][0].tolist(), # Convert tensor to list + t['attention_mask'][0].astype(np.int8).tolist(), + token_type_ids[0].astype(np.int8).tolist() + ) + + return result # Apply the UDF to the DataFrame scaled_df = input_df.withColumn(cols[0], tokenize(input_df[cols[0]])) - + scaled_df = scaled_df.select( + scaled_df[cols[0]].getItem("input_ids").alias("input_ids"), + scaled_df[cols[0]].getItem("attention_mask").alias("attention_mask"), + scaled_df[cols[0]].getItem("token_type_ids").alias("token_type_ids"), + ) return scaled_df @@ -109,7 +114,7 @@ def __init__( self.max_seq_length = max_seq_length def apply(self, input_df: DataFrame) -> DataFrame: - scaled_df = apply_norm(self.cols, self.bert_norm, self.max_seq_length, input_df) + scaled_df = apply_norm(self.cols, self.bert_norm, self.bert_model, self.max_seq_length, input_df) return scaled_df diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 29462abe2e..112fc9952c 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -40,6 +40,8 @@ VALUE_COUNTS, COLUMN_NAME, SPECIAL_CHARACTERS, + HUGGINGFACE_TRANFORM, + HUGGINGFACE_TOKENIZE, ) from ..config.config_parser import EdgeConfig, NodeConfig, StructureConfig from ..config.label_config_base import LabelConfig @@ -937,22 +939,10 @@ def _process_node_features( transformed_feature_df = transformer.apply_transformation(nodes_df) # TODO: Remove hack with [feat_conf.feat_name] - for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): - node_transformation_start = perf_counter() - single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( - feat_col, feat_name - ) - - feature_output_path = os.path.join( - self.output_prefix, f"node_data/{node_type}-{feat_name}" - ) - - logging.info( - "Writing output for feat_name: '%s' to %s", feat_name, feature_output_path - ) - path_list = self._write_df( - single_feature_df, feature_output_path, out_format="parquet" - ) + def process_feature(self, feat_name, single_feature_df, node_type): + feature_output_path = os.path.join(self.output_prefix, f"node_data/{node_type}-{feat_name}") + logging.info("Writing output for feat_name: '%s' to %s", feat_name, feature_output_path) + path_list = self._write_df(single_feature_df, feature_output_path, out_format="parquet") node_feature_metadata_dict = { "format": {"name": FORMAT_NAME, "delimiter": DELIMITER}, @@ -960,19 +950,24 @@ def _process_node_features( } node_type_feature_metadata[feat_name] = node_feature_metadata_dict - self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = ( - perf_counter() - node_transformation_start - ) + feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None) + nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) + ntype_feat_sizes.update({feat_name: nfeat_size}) - feat_val = single_feature_df.take(1)[0].asDict()[feat_name] + self.timers[ + f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = perf_counter() - node_transformation_start - if isinstance(feat_val, (int, float)): - nfeat_size = 1 - else: - nfeat_size = len(feat_val) - - ntype_feat_sizes.update({feat_name: nfeat_size}) + for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): + node_transformation_start = perf_counter() + print(feat_conf.feat_type, feat_conf.transformation_kwargs["normalizer"]) + if feat_conf.feat_type == HUGGINGFACE_TRANFORM and feat_conf.transformation_kwargs["normalizer"] == HUGGINGFACE_TOKENIZE: + for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: + single_feature_df = transformed_feature_df.select(bert_feat_name) + process_feature(self, bert_feat_name, single_feature_df, node_type) + else: + single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(feat_col, feat_name) + process_feature(self, feat_name, single_feature_df, node_type) return node_type_feature_metadata, ntype_feat_sizes def _process_node_labels( @@ -1337,37 +1332,36 @@ def _process_edge_features( transformed_feature_df = transformer.apply_transformation(edges_df) - for feat_name, feat_col in zip(feat_conf.feat_name, feat_conf.cols): - edge_feature_start = perf_counter() - single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( - feat_col, feat_name - ) - feature_output_path = os.path.join( - self.output_prefix, f"edge_data/{edge_type}-{feat_name}" - ) - - path_list = self._write_df( - single_feature_df, feature_output_path, out_format=FORMAT_NAME - ) + # TODO: Remove hack with [feat_conf.feat_name] + def process_feature(self, feat_name, single_feature_df, edge_type): + feature_output_path = os.path.join(self.output_prefix, f"edge_data/{edge_type}-{feat_name}") + logging.info("Writing output for feat_name: '%s' to %s", feat_name, feature_output_path) + path_list = self._write_df(single_feature_df, feature_output_path, out_format='parquet') edge_feature_metadata_dict = { "format": {"name": FORMAT_NAME, "delimiter": DELIMITER}, "data": path_list, } - edge_feature_metadata_dicts[feat_name] = edge_feature_metadata_dict + edge_type_feature_metadata[feat_name] = edge_feature_metadata_dict - self.timers[f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = ( - perf_counter() - edge_feature_start - ) + feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None) + nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) + etype_feat_sizes.update({feat_name: efeat_size}) - feat_val = single_feature_df.take(1)[0].asDict()[feat_name] + self.timers[ + f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = perf_counter() - edge_feature_start - if isinstance(feat_val, numbers.Number): - efeat_size = 1 - else: - efeat_size = len(feat_val) + for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): + edge_feature_start = perf_counter() - etype_feat_sizes.update({feat_name: efeat_size}) + if feat_conf.feat_type == HUGGINGFACE_TRANFORM and feat_conf.transformation_kwargs[ + "normalizer"] == HUGGINGFACE_TOKENIZE: + for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: + single_feature_df = transformed_feature_df.select(bert_feat_name) + process_feature(self, bert_feat_name, single_feature_df, edge_type) + else: + single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(feat_col, feat_name) + process_feature(self, feat_name, single_feature_df, edge_type) return edge_feature_metadata_dicts, etype_feat_sizes diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py index d7f334efbf..2448b09c6e 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py @@ -92,7 +92,7 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]: "multi-numerical", "categorical", "multi-categorical", - "bert" + "huggingface" ] or feature_type.startswith("text"): return StringType if feature_type in ["numerical", "bucket-numerical"]: diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index 8dc883f516..49f9d300a3 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -56,7 +56,7 @@ def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node node_dict["nodes"][0]["features"] = [ { "feature_col": ["paper_title"], - "transform": {"name": "bert_hf"}, + "transform": {"name": "tokenize_hf"}, } ] @@ -328,8 +328,8 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): { "column": "citation_name", "transformation": { - "name": "bert", - "kwargs": {"normalizer": "tokenize", "bert_model": "bert", "max_seq_length": 64}, + "name": "huggingface", + "kwargs": {"normalizer": "tokenize_hf", "bert_model": "bert", "max_seq_length": 64}, }, }, ] @@ -362,4 +362,4 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "type": "classification", "split_rate": {"train": 0.9, "val": 0.1, "test": 0.0}, }, - ] + ] \ No newline at end of file From c26aa6813d2f2169d44c718091442351036c9bfc Mon Sep 17 00:00:00 2001 From: JalenCato Date: Fri, 26 Jan 2024 20:24:03 +0000 Subject: [PATCH 11/38] rename --- .../graphstorm_processing/config/config_parser.py | 4 ++-- .../config/{bert_configs.py => hf_configs.py} | 2 +- .../data_transformations/dist_feature_transformer.py | 4 ++-- .../data_transformations/dist_transformations/__init__.py | 2 +- ...{dist_bert_transformation.py => dist_hf_transformation.py} | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) rename graphstorm-processing/graphstorm_processing/config/{bert_configs.py => hf_configs.py} (97%) rename graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/{dist_bert_transformation.py => dist_hf_transformation.py} (97%) diff --git a/graphstorm-processing/graphstorm_processing/config/config_parser.py b/graphstorm-processing/graphstorm_processing/config/config_parser.py index 18cbb69877..0caf3f26d2 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_parser.py +++ b/graphstorm-processing/graphstorm_processing/config/config_parser.py @@ -27,7 +27,7 @@ NumericalFeatureConfig, ) from .categorical_configs import MultiCategoricalFeatureConfig -from .bert_configs import BertConfig +from .hf_configs import HFConfig from .data_config_base import DataStorageConfig @@ -69,7 +69,7 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig: elif transformation_name == "multi-categorical": return MultiCategoricalFeatureConfig(feature_dict) elif transformation_name == "huggingface": - return BertConfig(feature_dict) + return HFConfig(feature_dict) else: raise RuntimeError(f"Unknown transformation name: '{transformation_name}'") diff --git a/graphstorm-processing/graphstorm_processing/config/bert_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py similarity index 97% rename from graphstorm-processing/graphstorm_processing/config/bert_configs.py rename to graphstorm-processing/graphstorm_processing/config/hf_configs.py index 9b2b1c0d44..2123c488a1 100644 --- a/graphstorm-processing/graphstorm_processing/config/bert_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -19,7 +19,7 @@ from .feature_config_base import FeatureConfig -class BertConfig(FeatureConfig): +class HFConfig(FeatureConfig): """Feature configuration for single-column numerical features. Supported kwargs diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py index 04ed695004..05af3b9634 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py @@ -26,7 +26,7 @@ DistBucketNumericalTransformation, DistCategoryTransformation, DistMultiCategoryTransformation, - DistBertTransformation, + DistHFTransformation, ) @@ -59,7 +59,7 @@ def __init__(self, feature_config: FeatureConfig): elif feat_type == "multi-categorical": self.transformation = DistMultiCategoryTransformation(**default_kwargs, **args_dict) elif feat_type == "huggingface": - self.transformation = DistBertTransformation(**default_kwargs, **args_dict) + self.transformation = DistHFTransformation(**default_kwargs, **args_dict) else: raise NotImplementedError( f"Feature {feat_name} has type: {feat_type} that is not supported" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py index b6e05c307e..a862ef967a 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py @@ -13,4 +13,4 @@ DistNumericalTransformation, ) from .dist_bucket_numerical_transformation import DistBucketNumericalTransformation -from .dist_bert_transformation import DistBertTransformation +from .dist_hf_transformation import DistHFTransformation diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py similarity index 97% rename from graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py rename to graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index b7b590fa43..4137da2d70 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_bert_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -92,7 +92,7 @@ def tokenize(text): return scaled_df -class DistBertTransformation(DistributedTransformation): +class DistHFTransformation(DistributedTransformation): """Transformation to apply various forms of bert normalization to a text input. Parameters @@ -120,4 +120,4 @@ def apply(self, input_df: DataFrame) -> DataFrame: @staticmethod def get_transformation_name() -> str: - return "DistBertTransformation" \ No newline at end of file + return "DistHFTransformation" \ No newline at end of file From ad74f46cb818552215519c41eed79401677c1f16 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Fri, 26 Jan 2024 21:22:56 +0000 Subject: [PATCH 12/38] add test for huggingface --- .../test_dist_huggingface_transformation.py | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 graphstorm-processing/tests/test_dist_huggingface_transformation.py diff --git a/graphstorm-processing/tests/test_dist_huggingface_transformation.py b/graphstorm-processing/tests/test_dist_huggingface_transformation.py new file mode 100644 index 0000000000..7fab5d33fc --- /dev/null +++ b/graphstorm-processing/tests/test_dist_huggingface_transformation.py @@ -0,0 +1,72 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import pytest +from pyspark.sql import DataFrame, SparkSession +import numpy as np +from numpy.testing import assert_array_equal +from transformers import AutoTokenizer + +from graphstorm_processing.data_transformations.dist_transformations import ( + DistHFTransformation, +) + + +def test_hf_tokenizer_example(spark: SparkSession, check_df_schema): + # Prepare test data and DataFrame + data = [ + ("mark", "doctor", None), + ("john", "scientist", 10000), + ("tara", "engineer", 20000), + ("jen", "nurse", 10000), + ] + columns = ["name", "occupation", "salary"] + input_df = spark.createDataFrame(data, schema=columns) + + # Configuration for Hugging Face tokenizer transformation + bert_model = "bert-base-uncased" + max_seq_length = 8 + + # Initialize and apply the distributed Hugging Face tokenization transformation + hf_tokenize = DistHFTransformation(["occupation"], "tokenize_hf", bert_model, max_seq_length) + output_df = hf_tokenize.apply(input_df) + assert len(output_df) == 3, "the output for huggingface tokenize should have three columns" + + # Validate the schema of the transformed DataFrame + for feature in ["input_ids", "attention_mask", "token_type_ids"]: + feature_df = output_df.select(feature) + check_df_schema(feature_df) + + # Collect the output data for comparison + output_data = feature_df.collect() + + # Tokenize the original text data for validation + original_text = [row[1] for row in data] + tokenizer = AutoTokenizer.from_pretrained(bert_model) + tokenized_data = tokenizer( + original_text, + max_length=max_seq_length, + truncation=True, + padding="max_length", + return_tensors="np", + ) + + # Compare the Spark DataFrame output with the expected tokenizer output + expected_output = tokenized_data[feature] + for idx, row in enumerate(output_data): + assert_array_equal( + row[0], expected_output[idx], err_msg=f"Row {idx} for {feature} is not equal" + ) From d7bccff003cb74fa83e53884def0202a758e326a Mon Sep 17 00:00:00 2001 From: JalenCato Date: Fri, 26 Jan 2024 21:53:33 +0000 Subject: [PATCH 13/38] black reformat --- .../config_conversion/gconstruct_converter.py | 3 +- .../config/hf_configs.py | 12 ++-- .../graphstorm_processing/constants.py | 3 +- .../dist_feature_transformer.py | 1 + .../dist_hf_transformation.py | 35 +++++++---- .../dist_heterogeneous_loader.py | 60 +++++++++++++------ .../graph_loaders/schema_utils.py | 3 +- 7 files changed, 79 insertions(+), 38 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index 1940a62c02..4c772fb9b2 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + from typing import Any from .converter_base import ConfigConverter @@ -139,7 +140,7 @@ def _convert_feature(feats: list[dict]) -> list[dict]: gsp_transformation_dict["kwargs"] = { "normalizer": "tokenize_hf", "bert_model": gconstruct_transform_dict["bert_model"], - "max_seq_length": gconstruct_transform_dict["max_seq_length"] + "max_seq_length": gconstruct_transform_dict["max_seq_length"], } # TODO: Add support for other common transformations here else: diff --git a/graphstorm-processing/graphstorm_processing/config/hf_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py index 2123c488a1..0d73975e5a 100644 --- a/graphstorm-processing/graphstorm_processing/config/hf_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + from typing import Mapping import numbers @@ -39,8 +40,9 @@ def __init__(self, config: Mapping): def _sanity_check(self) -> None: super()._sanity_check() - assert isinstance(self.bert_model, str),\ - f"Expect bert_model to be a string, but got {self.bert_model}" - assert isinstance(self.max_seq_length, int) and self.max_seq_length > 0, \ - f"Expect max_seq_length {self.max_seq_length} be an integer and larger than zero." - + assert isinstance( + self.bert_model, str + ), f"Expect bert_model to be a string, but got {self.bert_model}" + assert ( + isinstance(self.max_seq_length, int) and self.max_seq_length > 0 + ), f"Expect max_seq_length {self.max_seq_length} be an integer and larger than zero." diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index 7ce0967746..f848d41a50 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + ################### Categorical Limits ####################### MAX_CATEGORIES_PER_FEATURE = 100 RARE_CATEGORY = "GSP_CONSTANT_OTHER" @@ -46,4 +47,4 @@ ################# Bert transformations ################ HUGGINGFACE_TRANFORM = "huggingface" -HUGGINGFACE_TOKENIZE = "tokenize_hf" \ No newline at end of file +HUGGINGFACE_TOKENIZE = "tokenize_hf" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py index 05af3b9634..5527960b6a 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + import logging from pyspark.sql import DataFrame diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index 4137da2d70..e58d757068 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + import logging from typing import Optional, Sequence import uuid @@ -58,11 +59,13 @@ def apply_norm( tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") # Define the schema of your return type - schema = StructType([ - StructField("input_ids", ArrayType(IntegerType())), - StructField("attention_mask", ArrayType(IntegerType())), - StructField("token_type_ids", ArrayType(IntegerType())) - ]) + schema = StructType( + [ + StructField("input_ids", ArrayType(IntegerType())), + StructField("attention_mask", ArrayType(IntegerType())), + StructField("token_type_ids", ArrayType(IntegerType())), + ] + ) # Define UDF @udf(returnType=schema) @@ -72,12 +75,18 @@ def tokenize(text): raise ValueError("The input of the tokenizer has to be a string.") # Tokenize the text - t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np') - token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8)) + t = tokenizer( + text, + max_length=max_seq_length, + truncation=True, + padding="max_length", + return_tensors="np", + ) + token_type_ids = t.get("token_type_ids", np.zeros_like(t["input_ids"], dtype=np.int8)) result = ( - t['input_ids'][0].tolist(), # Convert tensor to list - t['attention_mask'][0].astype(np.int8).tolist(), - token_type_ids[0].astype(np.int8).tolist() + t["input_ids"][0].tolist(), # Convert tensor to list + t["attention_mask"][0].astype(np.int8).tolist(), + token_type_ids[0].astype(np.int8).tolist(), ) return result @@ -114,10 +123,12 @@ def __init__( self.max_seq_length = max_seq_length def apply(self, input_df: DataFrame) -> DataFrame: - scaled_df = apply_norm(self.cols, self.bert_norm, self.bert_model, self.max_seq_length, input_df) + scaled_df = apply_norm( + self.cols, self.bert_norm, self.bert_model, self.max_seq_length, input_df + ) return scaled_df @staticmethod def get_transformation_name() -> str: - return "DistHFTransformation" \ No newline at end of file + return "DistHFTransformation" diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 112fc9952c..dee818270a 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + import json import logging import numbers @@ -940,9 +941,15 @@ def _process_node_features( # TODO: Remove hack with [feat_conf.feat_name] def process_feature(self, feat_name, single_feature_df, node_type): - feature_output_path = os.path.join(self.output_prefix, f"node_data/{node_type}-{feat_name}") - logging.info("Writing output for feat_name: '%s' to %s", feat_name, feature_output_path) - path_list = self._write_df(single_feature_df, feature_output_path, out_format="parquet") + feature_output_path = os.path.join( + self.output_prefix, f"node_data/{node_type}-{feat_name}" + ) + logging.info( + "Writing output for feat_name: '%s' to %s", feat_name, feature_output_path + ) + path_list = self._write_df( + single_feature_df, feature_output_path, out_format=FORMAT_NAME + ) node_feature_metadata_dict = { "format": {"name": FORMAT_NAME, "delimiter": DELIMITER}, @@ -954,19 +961,24 @@ def process_feature(self, feat_name, single_feature_df, node_type): nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) ntype_feat_sizes.update({feat_name: nfeat_size}) - self.timers[ - f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = perf_counter() - node_transformation_start + self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = ( + perf_counter() - node_transformation_start + ) for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): node_transformation_start = perf_counter() - print(feat_conf.feat_type, feat_conf.transformation_kwargs["normalizer"]) - if feat_conf.feat_type == HUGGINGFACE_TRANFORM and feat_conf.transformation_kwargs["normalizer"] == HUGGINGFACE_TOKENIZE: + if ( + feat_conf.feat_type == HUGGINGFACE_TRANFORM + and feat_conf.transformation_kwargs["normalizer"] == HUGGINGFACE_TOKENIZE + ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) process_feature(self, bert_feat_name, single_feature_df, node_type) else: - single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(feat_col, feat_name) + single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( + feat_col, feat_name + ) process_feature(self, feat_name, single_feature_df, node_type) return node_type_feature_metadata, ntype_feat_sizes @@ -1206,6 +1218,7 @@ def process_edge_data(self, edge_configs: Sequence[EdgeConfig]) -> Tuple[Dict, D edge_data_dict = {} edges_dict = {} logging.info("Processing edge data...") + self.graph_info["efeat_size"] = {} self.graph_info["etype_label"] = [] self.graph_info["etype_label_property"] = [] for edge_config in edge_configs: @@ -1334,33 +1347,44 @@ def _process_edge_features( # TODO: Remove hack with [feat_conf.feat_name] def process_feature(self, feat_name, single_feature_df, edge_type): - feature_output_path = os.path.join(self.output_prefix, f"edge_data/{edge_type}-{feat_name}") - logging.info("Writing output for feat_name: '%s' to %s", feat_name, feature_output_path) - path_list = self._write_df(single_feature_df, feature_output_path, out_format='parquet') + feature_output_path = os.path.join( + self.output_prefix, f"edge_data/{edge_type}-{feat_name}" + ) + logging.info( + "Writing output for feat_name: '%s' to %s", feat_name, feature_output_path + ) + path_list = self._write_df( + single_feature_df, feature_output_path, out_format="parquet" + ) edge_feature_metadata_dict = { "format": {"name": FORMAT_NAME, "delimiter": DELIMITER}, "data": path_list, } - edge_type_feature_metadata[feat_name] = edge_feature_metadata_dict + edge_feature_metadata_dicts[feat_name] = edge_feature_metadata_dict feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None) - nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) + efeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) etype_feat_sizes.update({feat_name: efeat_size}) - self.timers[ - f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = perf_counter() - edge_feature_start + self.timers[f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = ( + perf_counter() - edge_feature_start + ) for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): edge_feature_start = perf_counter() - if feat_conf.feat_type == HUGGINGFACE_TRANFORM and feat_conf.transformation_kwargs[ - "normalizer"] == HUGGINGFACE_TOKENIZE: + if ( + feat_conf.feat_type == HUGGINGFACE_TRANFORM + and feat_conf.transformation_kwargs["normalizer"] == HUGGINGFACE_TOKENIZE + ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) process_feature(self, bert_feat_name, single_feature_df, edge_type) else: - single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(feat_col, feat_name) + single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( + feat_col, feat_name + ) process_feature(self, feat_name, single_feature_df, edge_type) return edge_feature_metadata_dicts, etype_feat_sizes diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py index 2448b09c6e..19ed03869d 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py @@ -16,6 +16,7 @@ This module is used to parse the schema of CSV input files, inferring the type of the columns from the type mentioned in the configuration. """ + import logging from typing import Sequence, List, Type @@ -92,7 +93,7 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]: "multi-numerical", "categorical", "multi-categorical", - "huggingface" + "huggingface", ] or feature_type.startswith("text"): return StringType if feature_type in ["numerical", "bucket-numerical"]: From bd8c075787fd4628aece977f3ade84c777285d1a Mon Sep 17 00:00:00 2001 From: JalenCato Date: Fri, 26 Jan 2024 22:24:44 +0000 Subject: [PATCH 14/38] apply lint --- .../config/hf_configs.py | 1 - .../dist_hf_transformation.py | 19 ++++--------------- .../dist_heterogeneous_loader.py | 4 +++- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/hf_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py index 0d73975e5a..ba55f1ce14 100644 --- a/graphstorm-processing/graphstorm_processing/config/hf_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -15,7 +15,6 @@ """ from typing import Mapping -import numbers from .feature_config_base import FeatureConfig diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index e58d757068..be83f94a54 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -14,26 +14,15 @@ limitations under the License. """ -import logging -from typing import Optional, Sequence -import uuid - -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql import functions as F -from pyspark.sql.types import MapType, ArrayType, IntegerType, StringType, StructType, StructField -from pyspark.ml.stat import Summarizer -from pyspark.ml import Pipeline -from pyspark.ml.functions import array_to_vector, vector_to_array -from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql import DataFrame +from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField from pyspark.sql.functions import udf import numpy as np -import pandas as pd from transformers import AutoTokenizer -from .base_dist_transformation import DistributedTransformation -from ..spark_utils import rename_multiple_cols from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE +from .base_dist_transformation import DistributedTransformation def apply_norm( @@ -56,7 +45,7 @@ def apply_norm( if bert_norm == HUGGINGFACE_TOKENIZE: # Initialize the tokenizer - tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") + tokenizer = AutoTokenizer.from_pretrained(bert_model) # Define the schema of your return type schema = StructType( diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index dee818270a..e4e3beda50 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -961,6 +961,7 @@ def process_feature(self, feat_name, single_feature_df, node_type): nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) ntype_feat_sizes.update({feat_name: nfeat_size}) + # pylint: disable=cell-var-from-loop self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = ( perf_counter() - node_transformation_start ) @@ -1364,9 +1365,10 @@ def process_feature(self, feat_name, single_feature_df, edge_type): edge_feature_metadata_dicts[feat_name] = edge_feature_metadata_dict feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None) - efeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) + efeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val) etype_feat_sizes.update({feat_name: efeat_size}) + # pylint: disable=cell-var-from-loop self.timers[f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = ( perf_counter() - edge_feature_start ) From ae967da419f8833dd2ede361f24e1ab26088933c Mon Sep 17 00:00:00 2001 From: JalenCato Date: Fri, 26 Jan 2024 22:58:06 +0000 Subject: [PATCH 15/38] add dependency --- .../gs-processing/developer/input-configuration.rst | 9 +++++++++ graphstorm-processing/pyproject.toml | 2 ++ 2 files changed, 11 insertions(+) diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 70e2da28ae..7fb5899457 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -447,6 +447,15 @@ arguments. will be considered as an array. For Parquet files, if the input type is ArrayType(StringType()), then the separator is ignored; if it is StringType(), it will apply same logic as in CSV. +- ``huggingface`` + + - Transforms a text feature column to tokens or embeddings with different Hugging Face models, enabling nuanced understanding and processing of natural language data. + - ``kwargs``: + + - ``normalizer`` (String, required): It should be "tokenize_hf". + - ``bert_model`` (String, required): It should be the identifier of a pre-trained model available in the Hugging Face Model Hub. + - ``max_seq_length`` (Integer, required): It specifies the maximum number of tokens of the input. + -------------- Creating a graph for inference diff --git a/graphstorm-processing/pyproject.toml b/graphstorm-processing/pyproject.toml index 16a5749533..d02c9c2ba9 100644 --- a/graphstorm-processing/pyproject.toml +++ b/graphstorm-processing/pyproject.toml @@ -19,6 +19,8 @@ pandas = "^1.3.5" psutil = "^5.9.5" sagemaker = "^2.83.0" scipy = "^1.10.1" +transformers = "^4.37.1" +setuptools = "59.6.0" [tool.poetry.group.dev] optional = true From a81f4b55308988936386d315cf67cfc13aeda14b Mon Sep 17 00:00:00 2001 From: JalenCato Date: Fri, 26 Jan 2024 23:56:00 +0000 Subject: [PATCH 16/38] add --- .../dist_transformations/dist_hf_transformation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index be83f94a54..9577993c0e 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ - +from typing import Sequence from pyspark.sql import DataFrame from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField from pyspark.sql.functions import udf From 91818260f2052a541406def28c8173bde70df692 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Sat, 27 Jan 2024 00:25:47 +0000 Subject: [PATCH 17/38] test fix --- graphstorm-processing/tests/test_converter.py | 2 +- graphstorm-processing/tests/test_dist_heterogenous_loader.py | 2 ++ .../tests/test_dist_huggingface_transformation.py | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index 49f9d300a3..52721e5645 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -56,7 +56,7 @@ def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node node_dict["nodes"][0]["features"] = [ { "feature_col": ["paper_title"], - "transform": {"name": "tokenize_hf"}, + "transform": {"name": "bert_hf"}, } ] diff --git a/graphstorm-processing/tests/test_dist_heterogenous_loader.py b/graphstorm-processing/tests/test_dist_heterogenous_loader.py index bad1e0bb53..3e717ce1d3 100644 --- a/graphstorm-processing/tests/test_dist_heterogenous_loader.py +++ b/graphstorm-processing/tests/test_dist_heterogenous_loader.py @@ -239,6 +239,7 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade graphinfo_updates = { "nfeat_size": {"user": {"age": 1, "multi": 2}}, + "efeat_size": {}, "etype_label": [], "etype_label_property": [], "ntype_label": ["user"], @@ -278,6 +279,7 @@ def test_load_dist_hgl_without_labels( graphinfo_updates = { "nfeat_size": {}, + "efeat_size": {}, "task_type": "link_prediction", "etype_label": [], "etype_label_property": [], diff --git a/graphstorm-processing/tests/test_dist_huggingface_transformation.py b/graphstorm-processing/tests/test_dist_huggingface_transformation.py index 7fab5d33fc..cd48d92b55 100644 --- a/graphstorm-processing/tests/test_dist_huggingface_transformation.py +++ b/graphstorm-processing/tests/test_dist_huggingface_transformation.py @@ -43,7 +43,7 @@ def test_hf_tokenizer_example(spark: SparkSession, check_df_schema): # Initialize and apply the distributed Hugging Face tokenization transformation hf_tokenize = DistHFTransformation(["occupation"], "tokenize_hf", bert_model, max_seq_length) output_df = hf_tokenize.apply(input_df) - assert len(output_df) == 3, "the output for huggingface tokenize should have three columns" + assert len(output_df.columns) == 3, "the output for huggingface tokenize should have three columns" # Validate the schema of the transformed DataFrame for feature in ["input_ids", "attention_mask", "token_type_ids"]: From 93a9685691f5aeb13e526b78c4a447285717cc7f Mon Sep 17 00:00:00 2001 From: JalenCato Date: Mon, 29 Jan 2024 18:09:59 +0000 Subject: [PATCH 18/38] add fix --- .../graph_loaders/dist_heterogeneous_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index e4e3beda50..5e58379ec6 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -1355,7 +1355,7 @@ def process_feature(self, feat_name, single_feature_df, edge_type): "Writing output for feat_name: '%s' to %s", feat_name, feature_output_path ) path_list = self._write_df( - single_feature_df, feature_output_path, out_format="parquet" + single_feature_df, feature_output_path, out_format=FORMAT_NAME ) edge_feature_metadata_dict = { From e091bf9966ac690d1e2b8cd31c2dec20a06ca194 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Mon, 29 Jan 2024 18:29:22 +0000 Subject: [PATCH 19/38] add test --- .../tests/test_dist_heterogenous_loader.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/graphstorm-processing/tests/test_dist_heterogenous_loader.py b/graphstorm-processing/tests/test_dist_heterogenous_loader.py index 3e717ce1d3..1ebd7a2b0d 100644 --- a/graphstorm-processing/tests/test_dist_heterogenous_loader.py +++ b/graphstorm-processing/tests/test_dist_heterogenous_loader.py @@ -238,7 +238,8 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade metadata = json.load(mfile) graphinfo_updates = { - "nfeat_size": {"user": {"age": 1, "multi": 2}}, + "nfeat_size": {"user": {"age": 1, "attention_mask": 16, + "input_ids": 16, "token_type_ids": 16, "multi": 2}}, "efeat_size": {}, "etype_label": [], "etype_label_property": [], @@ -257,7 +258,8 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade verify_integ_test_output(metadata, dghl_loader, graphinfo_updates) expected_node_data = { - "user": {"gender", "train_mask", "val_mask", "test_mask", "age", "multi"}, + "user": {"gender", "train_mask", "val_mask", "test_mask", "age", + "multi", "input_ids", "attention_mask", "token_type_ids"}, } for node_type in metadata["node_data"]: From 69bee37ebb1721e0080dc079742d90cebe995ba3 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Mon, 29 Jan 2024 18:34:51 +0000 Subject: [PATCH 20/38] change config --- .../gsprocessing-config.json | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json index 1ea789b69b..660ec6688c 100644 --- a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json +++ b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json @@ -55,6 +55,17 @@ "separator": "|" } } + }, + { + "column": "occupation", + "transformation": { + "name": "huggingface", + "kwargs": { + "normalizer": "tokenize_hf", + "bert_model": "bert-base-uncased", + "max_seq_length":16 + } + } } ], "labels": [ From dfd63e44f9c03deae9ba9b3efb886afc7ef11771 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 18:04:33 +0000 Subject: [PATCH 21/38] apply comments --- .../config/hf_configs.py | 9 ++++--- .../dist_hf_transformation.py | 27 ++++++++++--------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/hf_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py index ba55f1ce14..6773777bf7 100644 --- a/graphstorm-processing/graphstorm_processing/config/hf_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -17,10 +17,11 @@ from typing import Mapping from .feature_config_base import FeatureConfig +from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE class HFConfig(FeatureConfig): - """Feature configuration for single-column numerical features. + """Feature configuration for huggingface text features. Supported kwargs ---------------- @@ -32,13 +33,15 @@ class HFConfig(FeatureConfig): def __init__(self, config: Mapping): super().__init__(config) - self.bert_model = self._transformation_kwargs.get("bert_model", "none") - self.max_seq_length = self._transformation_kwargs.get("max_seq_length", "none") + self.bert_norm = self._transformation_kwargs.get("normalizer") + self.bert_model = self._transformation_kwargs.get("bert_model") + self.max_seq_length = self._transformation_kwargs.get("max_seq_length") self._sanity_check() def _sanity_check(self) -> None: super()._sanity_check() + assert self.bert_norm in [HUGGINGFACE_TOKENIZE], "bert normalizer needs to be tokenize_hf" assert isinstance( self.bert_model, str ), f"Expect bert_model to be a string, but got {self.bert_model}" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index 9577993c0e..a3c5375455 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -13,19 +13,18 @@ See the License for the specific language governing permissions and limitations under the License. """ -from typing import Sequence +import numpy as np from pyspark.sql import DataFrame from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField from pyspark.sql.functions import udf - -import numpy as np from transformers import AutoTokenizer +from typing import Sequence from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE from .base_dist_transformation import DistributedTransformation -def apply_norm( +def apply_transform( cols: Sequence[str], bert_norm: str, bert_model: str, max_seq_length: int, input_df: DataFrame ) -> DataFrame: """Applies a single normalizer to the imputed dataframe, individually to each of the columns @@ -39,6 +38,8 @@ def apply_norm( The type of normalization to use. Valid values is "tokenize" bert_model : str The name of huggingface model. + max_seq_length: int + The maximal length of the tokenization results. input_df : DataFrame The input DataFrame to apply normalization to. """ @@ -81,13 +82,13 @@ def tokenize(text): return result # Apply the UDF to the DataFrame - scaled_df = input_df.withColumn(cols[0], tokenize(input_df[cols[0]])) - scaled_df = scaled_df.select( - scaled_df[cols[0]].getItem("input_ids").alias("input_ids"), - scaled_df[cols[0]].getItem("attention_mask").alias("attention_mask"), - scaled_df[cols[0]].getItem("token_type_ids").alias("token_type_ids"), + transformed_df = input_df.withColumn(cols[0], tokenize(input_df[cols[0]])) + transformed_df = transformed_df.select( + transformed_df[cols[0]].getItem("input_ids").alias("input_ids"), + transformed_df[cols[0]].getItem("attention_mask").alias("attention_mask"), + transformed_df[cols[0]].getItem("token_type_ids").alias("token_type_ids"), ) - return scaled_df + return transformed_df class DistHFTransformation(DistributedTransformation): @@ -106,17 +107,17 @@ def __init__( ) -> None: super().__init__(cols) self.cols = cols - assert len(self.cols) == 1, "Bert transformation only supports single column" + assert len(self.cols) == 1, "Huggingface transformation only supports single column" self.bert_norm = normalizer self.bert_model = bert_model self.max_seq_length = max_seq_length def apply(self, input_df: DataFrame) -> DataFrame: - scaled_df = apply_norm( + transformed_df = apply_transform( self.cols, self.bert_norm, self.bert_model, self.max_seq_length, input_df ) - return scaled_df + return transformed_df @staticmethod def get_transformation_name() -> str: From 250938a0bab55d486ab153bcb3cfe9713920d0dd Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 18:18:06 +0000 Subject: [PATCH 22/38] apply comment --- .../dist_heterogeneous_loader.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 5e58379ec6..88d00a55ba 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -940,7 +940,7 @@ def _process_node_features( transformed_feature_df = transformer.apply_transformation(nodes_df) # TODO: Remove hack with [feat_conf.feat_name] - def process_feature(self, feat_name, single_feature_df, node_type): + def process_feature(self, feat_name, single_feature_df, node_type, transformer_name): feature_output_path = os.path.join( self.output_prefix, f"node_data/{node_type}-{feat_name}" ) @@ -961,8 +961,7 @@ def process_feature(self, feat_name, single_feature_df, node_type): nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val) ntype_feat_sizes.update({feat_name: nfeat_size}) - # pylint: disable=cell-var-from-loop - self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = ( + self.timers[f"{transformer_name}-{node_type}-{feat_name}"] = ( perf_counter() - node_transformation_start ) @@ -975,12 +974,14 @@ def process_feature(self, feat_name, single_feature_df, node_type): ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) - process_feature(self, bert_feat_name, single_feature_df, node_type) + process_feature(self, bert_feat_name, single_feature_df, node_type, + transformer.get_transformation_name()) else: single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( feat_col, feat_name ) - process_feature(self, feat_name, single_feature_df, node_type) + process_feature(self, feat_name, single_feature_df, node_type, + transformer.get_transformation_name()) return node_type_feature_metadata, ntype_feat_sizes def _process_node_labels( @@ -1347,7 +1348,8 @@ def _process_edge_features( transformed_feature_df = transformer.apply_transformation(edges_df) # TODO: Remove hack with [feat_conf.feat_name] - def process_feature(self, feat_name, single_feature_df, edge_type): + def process_feature(self, feat_name, single_feature_df, edge_type, + transformer_name): feature_output_path = os.path.join( self.output_prefix, f"edge_data/{edge_type}-{feat_name}" ) @@ -1368,8 +1370,7 @@ def process_feature(self, feat_name, single_feature_df, edge_type): efeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val) etype_feat_sizes.update({feat_name: efeat_size}) - # pylint: disable=cell-var-from-loop - self.timers[f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = ( + self.timers[f"{transformer_name}-{edge_type}-{feat_name}"] = ( perf_counter() - edge_feature_start ) @@ -1382,12 +1383,14 @@ def process_feature(self, feat_name, single_feature_df, edge_type): ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) - process_feature(self, bert_feat_name, single_feature_df, edge_type) + process_feature(self, bert_feat_name, single_feature_df, edge_type, + transformer.get_transformation_name()) else: single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( feat_col, feat_name ) - process_feature(self, feat_name, single_feature_df, edge_type) + process_feature(self, feat_name, single_feature_df, edge_type, + transformer.get_transformation_name()) return edge_feature_metadata_dicts, etype_feat_sizes From efb1bfabea79c618a7fefe0cf29e96c79f157c9c Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 18:30:53 +0000 Subject: [PATCH 23/38] apply lint --- .../graphstorm_processing/config/hf_configs.py | 2 +- .../dist_transformations/dist_hf_transformation.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/hf_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py index 6773777bf7..f06412c94a 100644 --- a/graphstorm-processing/graphstorm_processing/config/hf_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -16,8 +16,8 @@ from typing import Mapping -from .feature_config_base import FeatureConfig from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE +from .feature_config_base import FeatureConfig class HFConfig(FeatureConfig): diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index a3c5375455..9c5d8d581a 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -13,12 +13,12 @@ See the License for the specific language governing permissions and limitations under the License. """ +from typing import Sequence import numpy as np from pyspark.sql import DataFrame from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField from pyspark.sql.functions import udf from transformers import AutoTokenizer -from typing import Sequence from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE from .base_dist_transformation import DistributedTransformation From 6d135719f9d5c0794748b1947ff112fcabd531bd Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 20:13:11 +0000 Subject: [PATCH 24/38] add final line --- graphstorm-processing/tests/test_converter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index 52721e5645..bbee6a6f7c 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -362,4 +362,5 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "type": "classification", "split_rate": {"train": 0.9, "val": 0.1, "test": 0.0}, }, - ] \ No newline at end of file + ] + \ No newline at end of file From 3d8540769ace2eae73be81bc2a1e73ad945955f8 Mon Sep 17 00:00:00 2001 From: jalencato Date: Wed, 31 Jan 2024 12:15:56 -0800 Subject: [PATCH 25/38] Update docs/source/gs-processing/developer/input-configuration.rst Co-authored-by: Theodore Vasiloudis --- docs/source/gs-processing/developer/input-configuration.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 7fb5899457..13657ab1fb 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -452,7 +452,7 @@ arguments. - Transforms a text feature column to tokens or embeddings with different Hugging Face models, enabling nuanced understanding and processing of natural language data. - ``kwargs``: - - ``normalizer`` (String, required): It should be "tokenize_hf". + - ``action`` (String, required): The action to perform on the text data. Currently we only support text tokenization through HuggingFace models, so the only accepted value here is "tokenize_hf". - ``bert_model`` (String, required): It should be the identifier of a pre-trained model available in the Hugging Face Model Hub. - ``max_seq_length`` (Integer, required): It specifies the maximum number of tokens of the input. From 89924c6ce110e101d461b2f81c24cc23ee986825 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 20:56:07 +0000 Subject: [PATCH 26/38] name change --- .../config_conversion/gconstruct_converter.py | 2 +- .../config/hf_configs.py | 7 ++++-- .../dist_hf_transformation.py | 23 ++++++++++++------- .../dist_heterogeneous_loader.py | 4 ++-- .../gsprocessing-config.json | 2 +- graphstorm-processing/tests/test_converter.py | 2 +- 6 files changed, 25 insertions(+), 15 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index 4c772fb9b2..8df036a68a 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -138,7 +138,7 @@ def _convert_feature(feats: list[dict]) -> list[dict]: elif gconstruct_transform_dict["name"] == "tokenize_hf": gsp_transformation_dict["name"] = "huggingface" gsp_transformation_dict["kwargs"] = { - "normalizer": "tokenize_hf", + "action": "tokenize_hf", "bert_model": gconstruct_transform_dict["bert_model"], "max_seq_length": gconstruct_transform_dict["max_seq_length"], } diff --git a/graphstorm-processing/graphstorm_processing/config/hf_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py index f06412c94a..1cdc6e2510 100644 --- a/graphstorm-processing/graphstorm_processing/config/hf_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -25,6 +25,8 @@ class HFConfig(FeatureConfig): Supported kwargs ---------------- + action: str, required + The type of huggingface action to use. Valid values is "tokenize_hf" bert_model: str, required The name of the lm model. max_seq_length: int, required @@ -33,7 +35,7 @@ class HFConfig(FeatureConfig): def __init__(self, config: Mapping): super().__init__(config) - self.bert_norm = self._transformation_kwargs.get("normalizer") + self.action = self._transformation_kwargs.get("action") self.bert_model = self._transformation_kwargs.get("bert_model") self.max_seq_length = self._transformation_kwargs.get("max_seq_length") @@ -41,7 +43,8 @@ def __init__(self, config: Mapping): def _sanity_check(self) -> None: super()._sanity_check() - assert self.bert_norm in [HUGGINGFACE_TOKENIZE], "bert normalizer needs to be tokenize_hf" + assert self.action in [HUGGINGFACE_TOKENIZE], \ + f"huggingface action needs to be {HUGGINGFACE_TOKENIZE}" assert isinstance( self.bert_model, str ), f"Expect bert_model to be a string, but got {self.bert_model}" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index 9c5d8d581a..fb18099308 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -25,7 +25,7 @@ def apply_transform( - cols: Sequence[str], bert_norm: str, bert_model: str, max_seq_length: int, input_df: DataFrame + cols: Sequence[str], action: str, bert_model: str, max_seq_length: int, input_df: DataFrame ) -> DataFrame: """Applies a single normalizer to the imputed dataframe, individually to each of the columns provided in the cols argument. @@ -34,7 +34,7 @@ def apply_transform( ---------- cols : Sequence[str] List of column names to apply normalization to. - bert_norm : str + action : str The type of normalization to use. Valid values is "tokenize" bert_model : str The name of huggingface model. @@ -44,7 +44,7 @@ def apply_transform( The input DataFrame to apply normalization to. """ - if bert_norm == HUGGINGFACE_TOKENIZE: + if action == HUGGINGFACE_TOKENIZE: # Initialize the tokenizer tokenizer = AutoTokenizer.from_pretrained(bert_model) @@ -88,6 +88,9 @@ def tokenize(text): transformed_df[cols[0]].getItem("attention_mask").alias("attention_mask"), transformed_df[cols[0]].getItem("token_type_ids").alias("token_type_ids"), ) + else: + raise ValueError(f"The input action needs to be {HUGGINGFACE_TOKENIZE}") + return transformed_df @@ -98,23 +101,27 @@ class DistHFTransformation(DistributedTransformation): ---------- cols : Sequence[str] List of column names to apply normalization to. - bert_norm : str - The type of normalization to use. Valid values is "tokenize" + action : str + The type of huggingface action to use. Valid values is "tokenize" + bert_model: str, required + The name of the lm model. + max_seq_length: int, required + The maximal length of the tokenization results. """ def __init__( - self, cols: Sequence[str], normalizer: str, bert_model: str, max_seq_length: int + self, cols: Sequence[str], action: str, bert_model: str, max_seq_length: int ) -> None: super().__init__(cols) self.cols = cols assert len(self.cols) == 1, "Huggingface transformation only supports single column" - self.bert_norm = normalizer + self.action = action self.bert_model = bert_model self.max_seq_length = max_seq_length def apply(self, input_df: DataFrame) -> DataFrame: transformed_df = apply_transform( - self.cols, self.bert_norm, self.bert_model, self.max_seq_length, input_df + self.cols, self.action, self.bert_model, self.max_seq_length, input_df ) return transformed_df diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 88d00a55ba..17d31d45a5 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -970,7 +970,7 @@ def process_feature(self, feat_name, single_feature_df, node_type, transformer_n if ( feat_conf.feat_type == HUGGINGFACE_TRANFORM - and feat_conf.transformation_kwargs["normalizer"] == HUGGINGFACE_TOKENIZE + and feat_conf.transformation_kwargs["action"] == HUGGINGFACE_TOKENIZE ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) @@ -1379,7 +1379,7 @@ def process_feature(self, feat_name, single_feature_df, edge_type, if ( feat_conf.feat_type == HUGGINGFACE_TRANFORM - and feat_conf.transformation_kwargs["normalizer"] == HUGGINGFACE_TOKENIZE + and feat_conf.transformation_kwargs["action"] == HUGGINGFACE_TOKENIZE ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) diff --git a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json index 660ec6688c..6b7a8d5b9e 100644 --- a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json +++ b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json @@ -61,7 +61,7 @@ "transformation": { "name": "huggingface", "kwargs": { - "normalizer": "tokenize_hf", + "action": "tokenize_hf", "bert_model": "bert-base-uncased", "max_seq_length":16 } diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index bbee6a6f7c..1143b40113 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -329,7 +329,7 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "column": "citation_name", "transformation": { "name": "huggingface", - "kwargs": {"normalizer": "tokenize_hf", "bert_model": "bert", "max_seq_length": 64}, + "kwargs": {"action": "tokenize_hf", "bert_model": "bert", "max_seq_length": 64}, }, }, ] From 4ec8563eaecf11f6ca31c49412366ac0148ab3fe Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 22:58:55 +0000 Subject: [PATCH 27/38] add build docker' --- .../docker/0.2.1/emr-serverless/Dockerfile.cpu | 10 ++++++++++ .../docker/build_gsprocessing_image.sh | 12 ++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu b/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu index 8ef9d7bca6..7c43557801 100644 --- a/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu @@ -47,6 +47,16 @@ RUN pip install -r /usr/lib/spark/code/requirements.txt \ # GSProcessing codebase COPY code/ /usr/lib/spark/code/ +# Install Hugging Face model cache if it is necessary +ARG MODEL="" +ENV TRANSFORMERS_CACHE=/home/hadoop/.cache/huggingface/hub +RUN if [ $MODEL == "" ]; then \ + echo "Skip installing model cache"; \ +else \ + echo "Installing model cache for $MODEL" && \ + python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \ +fi + FROM runtime AS prod RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \ rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache diff --git a/graphstorm-processing/docker/build_gsprocessing_image.sh b/graphstorm-processing/docker/build_gsprocessing_image.sh index 4f5de008b5..3fd8803615 100644 --- a/graphstorm-processing/docker/build_gsprocessing_image.sh +++ b/graphstorm-processing/docker/build_gsprocessing_image.sh @@ -23,7 +23,9 @@ Available options: -i, --image Docker image name, default is 'graphstorm-processing'. -v, --version Docker version tag, default is the library's current version (`poetry version --short`) -s, --suffix Suffix for the image tag, can be used to push custom image tags. Default is "". --b, --build Docker build directory, default is '/tmp/` +-b, --build Docker build directory, default is '/tmp/'. +-m, --model Huggingface Model name that needs to be packed into the docker image. Default is "". + EOF exit } @@ -48,6 +50,7 @@ parse_params() { TARGET='test' ARCH='x86_64' SUFFIX="" + MODEL="" while :; do case "${1-}" in @@ -86,6 +89,10 @@ parse_params() { SUFFIX="${2-}" shift ;; + -m | --MODEL) + MODEL="${2-}" + shift + ;; -?*) die "Unknown option: $1" ;; *) break ;; esac @@ -135,6 +142,7 @@ msg "- GSP_HOME: ${GSP_HOME}" msg "- IMAGE_NAME: ${IMAGE_NAME}" msg "- VERSION: ${VERSION}" msg "- SUFFIX: ${SUFFIX}" +msg "- MODEL: ${MODEL}" # Prepare Docker build directory rm -rf "${BUILD_DIR}/docker/code" @@ -170,4 +178,4 @@ fi echo "Build a Docker image ${DOCKER_FULLNAME}" DOCKER_BUILDKIT=1 docker build --platform "linux/${ARCH}" -f "${GSP_HOME}/docker/${VERSION}/${EXEC_ENV}/Dockerfile.cpu" \ - "${BUILD_DIR}/docker/" -t $DOCKER_FULLNAME --target ${TARGET} --build-arg ARCH=${ARCH} + "${BUILD_DIR}/docker/" -t $DOCKER_FULLNAME --target ${TARGET} --build-arg ARCH=${ARCH} --build-arg MODEL=${MODEL} From 966e6fd70aea43c2c50ef46645ea3e74d73aa698 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 23:06:46 +0000 Subject: [PATCH 28/38] add doc --- .../usage/distributed-processing-setup.rst | 12 +++++++++++- docs/source/gs-processing/usage/emr-serverless.rst | 3 +++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index 12ecd88d56..8f5ac559f3 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -111,6 +111,16 @@ The script also supports other arguments to customize the image name, tag and other aspects of the build. See ``bash docker/build_gsprocessing_image.sh --help`` for more information. +Building the GraphStorm Processing image using Docker +----------------------------------------------------- +For EMR Serverless images, it is required to setup VPC and NAT route `Create a VPC on emr-serverless +`_ or adding huggingface +model cache inside the docker image. The ``build_gsprocessing_image.sh`` script can also offer a choice to +pack the model cache inside the docker image with the bert model name. +.. code-block:: bash + + bash docker/build_gsprocessing_image.sh --environment emr-serverless --model-name bert + Support for arm64 architecture ------------------------------ @@ -157,7 +167,7 @@ To build an EMR Serverless GSProcessing image for the ``arm64`` architecture you .. code-block:: bash - bash docker/build_gsprocessing_image.sh --environment sagemaker --architecture arm64 + bash docker/build_gsprocessing_image.sh --environment emr-serverless --architecture arm64 .. note:: diff --git a/docs/source/gs-processing/usage/emr-serverless.rst b/docs/source/gs-processing/usage/emr-serverless.rst index 35b54e9f1d..85effb6f19 100644 --- a/docs/source/gs-processing/usage/emr-serverless.rst +++ b/docs/source/gs-processing/usage/emr-serverless.rst @@ -98,6 +98,9 @@ Here you will need to replace ````, ```` (``x86_64`` or `` from the image you just created. GSProcessing version ``0.2.1`` uses ``emr-6.13.0`` as its base image, so we need to ensure our application uses the same release. +Additionally, if it is required to use text feature transformation, it is suggested to setup VPC and NAT route for the emr cluster: +`Create a VPC on emr-serverless +`_ Allow EMR Serverless to access the custom image repository ---------------------------------------------------------- From aa43a8397929c1d121ef16e0ef00bfdc87069355 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 23:07:29 +0000 Subject: [PATCH 29/38] add doc --- .../source/gs-processing/usage/distributed-processing-setup.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index 8f5ac559f3..235790207a 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -119,7 +119,7 @@ model cache inside the docker image. The ``build_gsprocessing_image.sh`` script pack the model cache inside the docker image with the bert model name. .. code-block:: bash - bash docker/build_gsprocessing_image.sh --environment emr-serverless --model-name bert + bash docker/build_gsprocessing_image.sh --environment emr-serverless --model-name bert-base-uncased Support for arm64 architecture ------------------------------ From a2d36d5a2e78175047447e999b1c59ac68411a6a Mon Sep 17 00:00:00 2001 From: JalenCato Date: Wed, 31 Jan 2024 23:13:17 +0000 Subject: [PATCH 30/38] add doc --- .../usage/distributed-processing-setup.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index 235790207a..39fdbc37d3 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -111,12 +111,12 @@ The script also supports other arguments to customize the image name, tag and other aspects of the build. See ``bash docker/build_gsprocessing_image.sh --help`` for more information. -Building the GraphStorm Processing image using Docker ------------------------------------------------------ -For EMR Serverless images, it is required to setup VPC and NAT route `Create a VPC on emr-serverless -`_ or adding huggingface -model cache inside the docker image. The ``build_gsprocessing_image.sh`` script can also offer a choice to -pack the model cache inside the docker image with the bert model name. +For EMR Serverless images, setting up a VPC and NAT route is a necessary step when using text data feature transformation. +You can find detailed instructions on creating a VPC for EMR Serverless in the AWS documentation: `Create a VPC on emr-serverless +`_. +Alternatively, there is one easier way to do that, you can opt to include the huggingface model cache directly in your Docker image. +The build_gsprocessing_image.sh script provides an option to embed the huggingface bert model cache within the Docker image. + .. code-block:: bash bash docker/build_gsprocessing_image.sh --environment emr-serverless --model-name bert-base-uncased From 9cbf74e9dbb5c7bf09031bdb47d01c7ffa9cd02d Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 1 Feb 2024 00:05:59 +0000 Subject: [PATCH 31/38] change dockerfile --- .../0.2.1/emr-serverless/Dockerfile.cpu | 10 --- .../0.2.2/emr-serverless/Dockerfile.cpu | 68 +++++++++++++++++++ .../docker/0.2.2/sagemaker/Dockerfile.cpu | 56 +++++++++++++++ 3 files changed, 124 insertions(+), 10 deletions(-) create mode 100644 graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu create mode 100644 graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu diff --git a/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu b/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu index 7c43557801..8ef9d7bca6 100644 --- a/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.1/emr-serverless/Dockerfile.cpu @@ -47,16 +47,6 @@ RUN pip install -r /usr/lib/spark/code/requirements.txt \ # GSProcessing codebase COPY code/ /usr/lib/spark/code/ -# Install Hugging Face model cache if it is necessary -ARG MODEL="" -ENV TRANSFORMERS_CACHE=/home/hadoop/.cache/huggingface/hub -RUN if [ $MODEL == "" ]; then \ - echo "Skip installing model cache"; \ -else \ - echo "Installing model cache for $MODEL" && \ - python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \ -fi - FROM runtime AS prod RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \ rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache diff --git a/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu b/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu new file mode 100644 index 0000000000..7c43557801 --- /dev/null +++ b/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu @@ -0,0 +1,68 @@ +ARG ARCH=x86_64 +FROM public.ecr.aws/emr-serverless/spark/emr-6.13.0:20230906-${ARCH} as base +FROM base as runtime + +USER root +ENV PYTHON_VERSION=3.9.18 + +# Python won’t try to write .pyc or .pyo files on the import of source modules +# Force stdin, stdout and stderr to be totally unbuffered. Good for logging +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV PYTHONIOENCODING=UTF-8 + +# Set up pyenv +ENV PYENV_ROOT="${HOME}/.pyenv" +ENV PATH="${PYENV_ROOT}/shims:${PYENV_ROOT}/bin:${PATH}" +ENV PYSPARK_DRIVER_PYTHON=${PYENV_ROOT}/shims/python +ENV PYSPARK_PYTHON=${PYENV_ROOT}/shims/python + +# TODO: These can probably all go to another builder stage? +RUN yum erase -y openssl-devel && \ + yum install -y \ + bzip2-devel\ + gcc \ + git \ + libffi-devel \ + ncurses-devel \ + openssl11-devel \ + readline-devel \ + sqlite-devel \ + sudo \ + xz-devel && \ + rm -rf /var/cache/yum +RUN git clone https://github.com/pyenv/pyenv.git ${PYENV_ROOT} && \ + pyenv install ${PYTHON_VERSION} && \ + pyenv global ${PYTHON_VERSION} + +WORKDIR /usr/lib/spark/code/ + +# Install GSProcessing requirements to pyenv Python +COPY requirements.txt requirements.txt +# Use --mount=type=cache,target=/root/.cache when Buildkit CI issue is fixed: +# https://github.com/moby/buildkit/issues/1512 +RUN pip install -r /usr/lib/spark/code/requirements.txt \ + && rm -rf /root/.cache + +# GSProcessing codebase +COPY code/ /usr/lib/spark/code/ + +# Install Hugging Face model cache if it is necessary +ARG MODEL="" +ENV TRANSFORMERS_CACHE=/home/hadoop/.cache/huggingface/hub +RUN if [ $MODEL == "" ]; then \ + echo "Skip installing model cache"; \ +else \ + echo "Installing model cache for $MODEL" && \ + python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \ +fi + +FROM runtime AS prod +RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \ + rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache + +FROM runtime AS test +RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ && rm -rf /root/.cache + +USER hadoop:hadoop +WORKDIR /home/hadoop diff --git a/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu new file mode 100644 index 0000000000..13365347fd --- /dev/null +++ b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu @@ -0,0 +1,56 @@ +# syntax=docker/dockerfile:experimental +FROM 153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.4-cpu-py39-v1.0 AS base + +# Python won’t try to write .pyc or .pyo files on the import of source modules +# Force stdin, stdout and stderr to be totally unbuffered. Good for logging +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV PYTHONIOENCODING=UTF-8 +ENV LANG=C.UTF-8 +ENV LC_ALL=C.UTF-8 +ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib" +ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/conda/lib" +ENV PATH=/opt/conda/bin:$PATH + +# Install GSProcessing requirements to pipenv Python +RUN pipenv install \ + boto3==1.28.38 \ + joblib==1.3.1 \ + mock==5.1.0 \ + pandas==1.3.5 \ + pip==23.1.2 \ + protobuf==3.20.3 \ + psutil==5.9.5 \ + pyarrow==13.0.0 \ + pyspark==3.4.1 \ + scipy==1.11.3 \ + setuptools \ + spacy==3.6.0 \ + wheel \ + && rm -rf /root/.cache +# Do a pipenv sync so our base libs are independent from our editable code, making them cacheable +RUN pipenv sync --system && python3 -m spacy download en_core_web_lg \ + && rm -rf /root/.cache + +# Graphloader codebase +COPY code/ /usr/lib/spark/code/ +WORKDIR /usr/lib/spark/code/ + +# Base container assumes this is the workdir +ENV SPARK_HOME /usr/lib/spark +WORKDIR $SPARK_HOME + +# Ensure our python3 installation is the one used +RUN echo 'alias python3=python3.9' >> ~/.bashrc + +# Starts framework +ENTRYPOINT ["bash", "/usr/lib/spark/code/docker-entry.sh"] + +FROM base AS prod +RUN python3 -m pip install /usr/lib/spark/code/graphstorm_processing-*.whl && \ + rm /usr/lib/spark/code/graphstorm_processing-*.whl +CMD ["gs-processing"] + +FROM base AS test +RUN python3 -m pip install /usr/lib/spark/code/graphstorm-processing/ +CMD ["sh", "-c", "pytest ./code/tests/"] From b702b30c073fa17a3d13327d9660fd265c36b0f0 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 1 Feb 2024 00:27:45 +0000 Subject: [PATCH 32/38] add docker packing --- .../usage/distributed-processing-setup.rst | 2 ++ docs/source/gs-processing/usage/emr-serverless.rst | 3 ++- .../docker/0.2.2/emr-serverless/Dockerfile.cpu | 2 +- .../docker/0.2.2/sagemaker/Dockerfile.cpu | 10 ++++++++++ 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index 39fdbc37d3..b8316e57c0 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -115,10 +115,12 @@ For EMR Serverless images, setting up a VPC and NAT route is a necessary step wh You can find detailed instructions on creating a VPC for EMR Serverless in the AWS documentation: `Create a VPC on emr-serverless `_. Alternatively, there is one easier way to do that, you can opt to include the huggingface model cache directly in your Docker image. +It is available for both SageMaker docker image and EMR-serverless docker image. It is a good way to save cost to avoid downloading when launching the clusters. The build_gsprocessing_image.sh script provides an option to embed the huggingface bert model cache within the Docker image. .. code-block:: bash + bash docker/build_gsprocessing_image.sh --environment sagemaker --model-name bert-base-uncased bash docker/build_gsprocessing_image.sh --environment emr-serverless --model-name bert-base-uncased Support for arm64 architecture diff --git a/docs/source/gs-processing/usage/emr-serverless.rst b/docs/source/gs-processing/usage/emr-serverless.rst index 85effb6f19..5c869b5d24 100644 --- a/docs/source/gs-processing/usage/emr-serverless.rst +++ b/docs/source/gs-processing/usage/emr-serverless.rst @@ -100,7 +100,8 @@ base image, so we need to ensure our application uses the same release. Additionally, if it is required to use text feature transformation, it is suggested to setup VPC and NAT route for the emr cluster: `Create a VPC on emr-serverless -`_ +`_. If it is preferred to save model cache +inside the docker image, check out how we do that in: :doc:`distributed-processing-setup`. Allow EMR Serverless to access the custom image repository ---------------------------------------------------------- diff --git a/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu b/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu index 7c43557801..cf262f8f82 100644 --- a/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu @@ -47,7 +47,7 @@ RUN pip install -r /usr/lib/spark/code/requirements.txt \ # GSProcessing codebase COPY code/ /usr/lib/spark/code/ -# Install Hugging Face model cache if it is necessary +# Install Huggingface model cache if it is necessary ARG MODEL="" ENV TRANSFORMERS_CACHE=/home/hadoop/.cache/huggingface/hub RUN if [ $MODEL == "" ]; then \ diff --git a/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu index 13365347fd..2eb0d8fb8b 100644 --- a/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu @@ -43,6 +43,16 @@ WORKDIR $SPARK_HOME # Ensure our python3 installation is the one used RUN echo 'alias python3=python3.9' >> ~/.bashrc +# Install Huggingface model cache if it is necessary +ARG MODEL="" +ENV TRANSFORMERS_CACHE=/root/.cache/huggingface/hub +RUN if [ $MODEL == "" ]; then \ + echo "Skip installing model cache"; \ +else \ + echo "Installing model cache for $MODEL" && \ + python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \ +fi + # Starts framework ENTRYPOINT ["bash", "/usr/lib/spark/code/docker-entry.sh"] From a67c66c0789b822cbbeb607d7a08956a2587f20f Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 1 Feb 2024 00:28:55 +0000 Subject: [PATCH 33/38] doc --- .../source/gs-processing/usage/distributed-processing-setup.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index b8316e57c0..4534f76a06 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -115,7 +115,7 @@ For EMR Serverless images, setting up a VPC and NAT route is a necessary step wh You can find detailed instructions on creating a VPC for EMR Serverless in the AWS documentation: `Create a VPC on emr-serverless `_. Alternatively, there is one easier way to do that, you can opt to include the huggingface model cache directly in your Docker image. -It is available for both SageMaker docker image and EMR-serverless docker image. It is a good way to save cost to avoid downloading when launching the clusters. +It is available for both SageMaker docker image and EMR-serverless docker image. It is a good way to save cost as it avoids downloading when launching the clusters. The build_gsprocessing_image.sh script provides an option to embed the huggingface bert model cache within the Docker image. .. code-block:: bash From fe4929247dc8b16d5affc4817e9889f03e2412a8 Mon Sep 17 00:00:00 2001 From: jalencato Date: Thu, 1 Feb 2024 11:33:55 -0800 Subject: [PATCH 34/38] Apply suggestions from code review Co-authored-by: Theodore Vasiloudis --- .../gs-processing/usage/distributed-processing-setup.rst | 9 +++++---- .../dist_transformations/dist_hf_transformation.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index 4534f76a06..053837f1d3 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -111,12 +111,13 @@ The script also supports other arguments to customize the image name, tag and other aspects of the build. See ``bash docker/build_gsprocessing_image.sh --help`` for more information. -For EMR Serverless images, setting up a VPC and NAT route is a necessary step when using text data feature transformation. +If you plan to use text transformations that utilize Huggingface model, you can opt to include the Huggingface model cache directly in your Docker image. +The build_gsprocessing_image.sh script provides an option to embed the huggingface bert model cache within the Docker image, using the `--hf-model` argument. +You can do this for both the SageMaker docker image and EMR Serverless docker image. It is a good way to save cost as it avoids downloading models after launching the job. +If you'd rather download the Huggingface models at runtime, for EMR Serverless images, setting up a VPC and NAT route is a necessary. You can find detailed instructions on creating a VPC for EMR Serverless in the AWS documentation: `Create a VPC on emr-serverless `_. -Alternatively, there is one easier way to do that, you can opt to include the huggingface model cache directly in your Docker image. -It is available for both SageMaker docker image and EMR-serverless docker image. It is a good way to save cost as it avoids downloading when launching the clusters. -The build_gsprocessing_image.sh script provides an option to embed the huggingface bert model cache within the Docker image. + .. code-block:: bash diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index fb18099308..d8d94a4f68 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -35,7 +35,7 @@ def apply_transform( cols : Sequence[str] List of column names to apply normalization to. action : str - The type of normalization to use. Valid values is "tokenize" + The type of normalization to use. Currently we only accept the `tokenize_hf` action. bert_model : str The name of huggingface model. max_seq_length: int From 5c6fd8c4b35a4c0bf77a206c6e1935827192e86d Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 1 Feb 2024 20:12:02 +0000 Subject: [PATCH 35/38] final version --- .../gs-processing/developer/input-configuration.rst | 5 +++++ docs/source/gs-processing/usage/emr-serverless.rst | 10 ++++++---- .../docker/0.2.2/sagemaker/Dockerfile.cpu | 1 + .../docker/build_gsprocessing_image.sh | 4 ++-- .../graph_loaders/dist_heterogeneous_loader.py | 4 ++-- graphstorm-processing/pyproject.toml | 3 +-- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 13657ab1fb..6ec1d1965d 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -453,6 +453,11 @@ arguments. - ``kwargs``: - ``action`` (String, required): The action to perform on the text data. Currently we only support text tokenization through HuggingFace models, so the only accepted value here is "tokenize_hf". + - ``tokenize_hf``: It tokenizes text strings with a HuggingFace tokenizer with a predefined tokenizer hosted on huggingface.co. The tokenizer_hf can use any HuggingFace LM models available in the huggingface repo. + Check more information on: `huggingface autotokenizer `_ + The expected input can any length of text strings, and the expected output will include ``input_ids`` for the token IDs for the input tex, + ``attention_mask`` for a mask to avoid performing attention on padding token indices, and ``token_type_ids`` used for segmenting two sentences in models. + The output here is compatible for graphstorm language model training or inference pipeline. - ``bert_model`` (String, required): It should be the identifier of a pre-trained model available in the Hugging Face Model Hub. - ``max_seq_length`` (Integer, required): It specifies the maximum number of tokens of the input. diff --git a/docs/source/gs-processing/usage/emr-serverless.rst b/docs/source/gs-processing/usage/emr-serverless.rst index 5c869b5d24..919f62e7c1 100644 --- a/docs/source/gs-processing/usage/emr-serverless.rst +++ b/docs/source/gs-processing/usage/emr-serverless.rst @@ -98,10 +98,12 @@ Here you will need to replace ````, ```` (``x86_64`` or `` from the image you just created. GSProcessing version ``0.2.1`` uses ``emr-6.13.0`` as its base image, so we need to ensure our application uses the same release. -Additionally, if it is required to use text feature transformation, it is suggested to setup VPC and NAT route for the emr cluster: -`Create a VPC on emr-serverless -`_. If it is preferred to save model cache -inside the docker image, check out how we do that in: :doc:`distributed-processing-setup`. +Additionally, if it is required to use text feature transformation, it is suggested to download the model cache inside the emr-serverless +docker image: :doc:`distributed-processing-setup` to save cost and time. Please note that the maximum size for docker images in EMR Serverless is limited to 5GB: +`EMR Serverless Considerations and Limitations +`_. + + Allow EMR Serverless to access the custom image repository ---------------------------------------------------------- diff --git a/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu index 2eb0d8fb8b..11ee72acda 100644 --- a/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu @@ -25,6 +25,7 @@ RUN pipenv install \ pyspark==3.4.1 \ scipy==1.11.3 \ setuptools \ + transformers==4.37.1 \ spacy==3.6.0 \ wheel \ && rm -rf /root/.cache diff --git a/graphstorm-processing/docker/build_gsprocessing_image.sh b/graphstorm-processing/docker/build_gsprocessing_image.sh index 3fd8803615..d2b71e43da 100644 --- a/graphstorm-processing/docker/build_gsprocessing_image.sh +++ b/graphstorm-processing/docker/build_gsprocessing_image.sh @@ -24,7 +24,7 @@ Available options: -v, --version Docker version tag, default is the library's current version (`poetry version --short`) -s, --suffix Suffix for the image tag, can be used to push custom image tags. Default is "". -b, --build Docker build directory, default is '/tmp/'. --m, --model Huggingface Model name that needs to be packed into the docker image. Default is "". +-m, --hf-model Huggingface Model name that needs to be packed into the docker image. Default is "". EOF exit @@ -89,7 +89,7 @@ parse_params() { SUFFIX="${2-}" shift ;; - -m | --MODEL) + -m | --hf-model) MODEL="${2-}" shift ;; diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 17d31d45a5..16506919ee 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -939,7 +939,6 @@ def _process_node_features( transformed_feature_df = transformer.apply_transformation(nodes_df) - # TODO: Remove hack with [feat_conf.feat_name] def process_feature(self, feat_name, single_feature_df, node_type, transformer_name): feature_output_path = os.path.join( self.output_prefix, f"node_data/{node_type}-{feat_name}" @@ -965,6 +964,7 @@ def process_feature(self, feat_name, single_feature_df, node_type, transformer_n perf_counter() - node_transformation_start ) + # TODO: Remove hack with [feat_conf.feat_name] for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): node_transformation_start = perf_counter() @@ -1347,7 +1347,6 @@ def _process_edge_features( transformed_feature_df = transformer.apply_transformation(edges_df) - # TODO: Remove hack with [feat_conf.feat_name] def process_feature(self, feat_name, single_feature_df, edge_type, transformer_name): feature_output_path = os.path.join( @@ -1374,6 +1373,7 @@ def process_feature(self, feat_name, single_feature_df, edge_type, perf_counter() - edge_feature_start ) + # TODO: Remove hack with [feat_conf.feat_name] for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols): edge_feature_start = perf_counter() diff --git a/graphstorm-processing/pyproject.toml b/graphstorm-processing/pyproject.toml index d02c9c2ba9..165706aa97 100644 --- a/graphstorm-processing/pyproject.toml +++ b/graphstorm-processing/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "graphstorm_processing" -version = "0.2.1" +version = "0.2.2" description = "Distributed graph pre-processing for GraphStorm" readme = "README.md" packages = [{include = "graphstorm_processing"}] @@ -20,7 +20,6 @@ psutil = "^5.9.5" sagemaker = "^2.83.0" scipy = "^1.10.1" transformers = "^4.37.1" -setuptools = "59.6.0" [tool.poetry.group.dev] optional = true From b09d05874a961c6f48e3b851faebd52aae1c4983 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 1 Feb 2024 20:21:54 +0000 Subject: [PATCH 36/38] apply black --- .../config/config_parser.py | 1 + .../config/hf_configs.py | 5 ++- .../dist_transformations/__init__.py | 1 + .../dist_hf_transformation.py | 1 + .../dist_heterogeneous_loader.py | 39 ++++++++++++++----- .../test_dist_huggingface_transformation.py | 4 +- 6 files changed, 38 insertions(+), 13 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/config/config_parser.py b/graphstorm-processing/graphstorm_processing/config/config_parser.py index 0caf3f26d2..38e92528d8 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_parser.py +++ b/graphstorm-processing/graphstorm_processing/config/config_parser.py @@ -15,6 +15,7 @@ Configuration parsing for edges and nodes """ + from abc import ABC from typing import Any, Dict, List, Optional, Sequence diff --git a/graphstorm-processing/graphstorm_processing/config/hf_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py index 1cdc6e2510..77cbd13f17 100644 --- a/graphstorm-processing/graphstorm_processing/config/hf_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -43,8 +43,9 @@ def __init__(self, config: Mapping): def _sanity_check(self) -> None: super()._sanity_check() - assert self.action in [HUGGINGFACE_TOKENIZE], \ - f"huggingface action needs to be {HUGGINGFACE_TOKENIZE}" + assert self.action in [ + HUGGINGFACE_TOKENIZE + ], f"huggingface action needs to be {HUGGINGFACE_TOKENIZE}" assert isinstance( self.bert_model, str ), f"Expect bert_model to be a string, but got {self.bert_model}" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py index a862ef967a..4849c53acc 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py @@ -1,6 +1,7 @@ """ Implementations for the various distributed transformations. """ + from .base_dist_transformation import DistributedTransformation from .dist_category_transformation import ( DistCategoryTransformation, diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index d8d94a4f68..20ddecd1f0 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + from typing import Sequence import numpy as np from pyspark.sql import DataFrame diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 16506919ee..e7b926058d 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -974,14 +974,24 @@ def process_feature(self, feat_name, single_feature_df, node_type, transformer_n ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) - process_feature(self, bert_feat_name, single_feature_df, node_type, - transformer.get_transformation_name()) + process_feature( + self, + bert_feat_name, + single_feature_df, + node_type, + transformer.get_transformation_name(), + ) else: single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( feat_col, feat_name ) - process_feature(self, feat_name, single_feature_df, node_type, - transformer.get_transformation_name()) + process_feature( + self, + feat_name, + single_feature_df, + node_type, + transformer.get_transformation_name(), + ) return node_type_feature_metadata, ntype_feat_sizes def _process_node_labels( @@ -1347,8 +1357,7 @@ def _process_edge_features( transformed_feature_df = transformer.apply_transformation(edges_df) - def process_feature(self, feat_name, single_feature_df, edge_type, - transformer_name): + def process_feature(self, feat_name, single_feature_df, edge_type, transformer_name): feature_output_path = os.path.join( self.output_prefix, f"edge_data/{edge_type}-{feat_name}" ) @@ -1383,14 +1392,24 @@ def process_feature(self, feat_name, single_feature_df, edge_type, ): for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]: single_feature_df = transformed_feature_df.select(bert_feat_name) - process_feature(self, bert_feat_name, single_feature_df, edge_type, - transformer.get_transformation_name()) + process_feature( + self, + bert_feat_name, + single_feature_df, + edge_type, + transformer.get_transformation_name(), + ) else: single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed( feat_col, feat_name ) - process_feature(self, feat_name, single_feature_df, edge_type, - transformer.get_transformation_name()) + process_feature( + self, + feat_name, + single_feature_df, + edge_type, + transformer.get_transformation_name(), + ) return edge_feature_metadata_dicts, etype_feat_sizes diff --git a/graphstorm-processing/tests/test_dist_huggingface_transformation.py b/graphstorm-processing/tests/test_dist_huggingface_transformation.py index cd48d92b55..0982f46cf0 100644 --- a/graphstorm-processing/tests/test_dist_huggingface_transformation.py +++ b/graphstorm-processing/tests/test_dist_huggingface_transformation.py @@ -43,7 +43,9 @@ def test_hf_tokenizer_example(spark: SparkSession, check_df_schema): # Initialize and apply the distributed Hugging Face tokenization transformation hf_tokenize = DistHFTransformation(["occupation"], "tokenize_hf", bert_model, max_seq_length) output_df = hf_tokenize.apply(input_df) - assert len(output_df.columns) == 3, "the output for huggingface tokenize should have three columns" + assert ( + len(output_df.columns) == 3 + ), "the output for huggingface tokenize should have three columns" # Validate the schema of the transformed DataFrame for feature in ["input_ids", "attention_mask", "token_type_ids"]: From 9d1a234e18f544df06afe1389a71c189b6a24b5a Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 1 Feb 2024 20:30:27 +0000 Subject: [PATCH 37/38] doc --- docs/source/gs-processing/developer/input-configuration.rst | 6 +++--- docs/source/gs-processing/usage/emr-serverless.rst | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 6ec1d1965d..67351767f8 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -455,9 +455,9 @@ arguments. - ``action`` (String, required): The action to perform on the text data. Currently we only support text tokenization through HuggingFace models, so the only accepted value here is "tokenize_hf". - ``tokenize_hf``: It tokenizes text strings with a HuggingFace tokenizer with a predefined tokenizer hosted on huggingface.co. The tokenizer_hf can use any HuggingFace LM models available in the huggingface repo. Check more information on: `huggingface autotokenizer `_ - The expected input can any length of text strings, and the expected output will include ``input_ids`` for the token IDs for the input tex, - ``attention_mask`` for a mask to avoid performing attention on padding token indices, and ``token_type_ids`` used for segmenting two sentences in models. - The output here is compatible for graphstorm language model training or inference pipeline. + The expected input can any length of text strings, and the expected output will include ``input_ids`` for token IDs on the input text, + ``attention_mask`` for a mask to avoid performing attention on padding token indices, and ``token_type_ids`` for segmenting two sentences in models. + The output here is compatible for graphstorm language model training and inference pipelines. - ``bert_model`` (String, required): It should be the identifier of a pre-trained model available in the Hugging Face Model Hub. - ``max_seq_length`` (Integer, required): It specifies the maximum number of tokens of the input. diff --git a/docs/source/gs-processing/usage/emr-serverless.rst b/docs/source/gs-processing/usage/emr-serverless.rst index 919f62e7c1..e399b9872e 100644 --- a/docs/source/gs-processing/usage/emr-serverless.rst +++ b/docs/source/gs-processing/usage/emr-serverless.rst @@ -98,7 +98,7 @@ Here you will need to replace ````, ```` (``x86_64`` or `` from the image you just created. GSProcessing version ``0.2.1`` uses ``emr-6.13.0`` as its base image, so we need to ensure our application uses the same release. -Additionally, if it is required to use text feature transformation, it is suggested to download the model cache inside the emr-serverless +Additionally, if it is required to use text feature transformation with Huggingface model, it is suggested to download the model cache inside the emr-serverless docker image: :doc:`distributed-processing-setup` to save cost and time. Please note that the maximum size for docker images in EMR Serverless is limited to 5GB: `EMR Serverless Considerations and Limitations `_. From 62472972b2a5946ada6f473c4ddc9ae283578a40 Mon Sep 17 00:00:00 2001 From: JalenCato Date: Thu, 1 Feb 2024 23:25:35 +0000 Subject: [PATCH 38/38] convert --- graphstorm-processing/tests/test_converter.py | 8 ++++-- .../tests/test_dist_heterogenous_loader.py | 25 ++++++++++++++++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index 1143b40113..a910dc2622 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + import pytest from graphstorm_processing.config.config_conversion import ( @@ -237,7 +238,11 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): }, { "feature_col": ["citation_name"], - "transform": {"name": "tokenize_hf", "bert_model": "bert", "max_seq_length": 64}, + "transform": { + "name": "tokenize_hf", + "bert_model": "bert", + "max_seq_length": 64, + }, }, ], "labels": [ @@ -363,4 +368,3 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "split_rate": {"train": 0.9, "val": 0.1, "test": 0.0}, }, ] - \ No newline at end of file diff --git a/graphstorm-processing/tests/test_dist_heterogenous_loader.py b/graphstorm-processing/tests/test_dist_heterogenous_loader.py index 1ebd7a2b0d..b0c8ae2545 100644 --- a/graphstorm-processing/tests/test_dist_heterogenous_loader.py +++ b/graphstorm-processing/tests/test_dist_heterogenous_loader.py @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ + from typing import Any, Dict, List, Tuple import json import os @@ -238,8 +239,15 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade metadata = json.load(mfile) graphinfo_updates = { - "nfeat_size": {"user": {"age": 1, "attention_mask": 16, - "input_ids": 16, "token_type_ids": 16, "multi": 2}}, + "nfeat_size": { + "user": { + "age": 1, + "attention_mask": 16, + "input_ids": 16, + "token_type_ids": 16, + "multi": 2, + } + }, "efeat_size": {}, "etype_label": [], "etype_label_property": [], @@ -258,8 +266,17 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade verify_integ_test_output(metadata, dghl_loader, graphinfo_updates) expected_node_data = { - "user": {"gender", "train_mask", "val_mask", "test_mask", "age", - "multi", "input_ids", "attention_mask", "token_type_ids"}, + "user": { + "gender", + "train_mask", + "val_mask", + "test_mask", + "age", + "multi", + "input_ids", + "attention_mask", + "token_type_ids", + }, } for node_type in metadata["node_data"]: