From 2f4f105d790293e15666df86a6ccdd49fb1415bd Mon Sep 17 00:00:00 2001 From: jalencato Date: Fri, 9 Feb 2024 11:57:02 -0800 Subject: [PATCH] [GSProcessing] Bert Embedding (#724) *Issue #, if available:* *Description of changes:* By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- .../developer/input-configuration.rst | 13 ++- .../0.2.2/emr-serverless/Dockerfile.cpu | 1 + .../docker/0.2.2/sagemaker/Dockerfile.cpu | 2 + .../config_conversion/gconstruct_converter.py | 9 +- .../config/hf_configs.py | 19 ++-- .../graphstorm_processing/constants.py | 1 + .../dist_hf_transformation.py | 87 ++++++++++++++++--- graphstorm-processing/pyproject.toml | 8 +- .../gsprocessing-config.json | 2 +- graphstorm-processing/tests/test_converter.py | 19 +++- .../test_dist_huggingface_transformation.py | 67 +++++++++++++- 11 files changed, 193 insertions(+), 35 deletions(-) diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 67351767f8..386c5d58d2 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -452,14 +452,21 @@ arguments. - Transforms a text feature column to tokens or embeddings with different Hugging Face models, enabling nuanced understanding and processing of natural language data. - ``kwargs``: - - ``action`` (String, required): The action to perform on the text data. Currently we only support text tokenization through HuggingFace models, so the only accepted value here is "tokenize_hf". + - ``action`` (String, required): Currently we support embedding creation using HuggingFace models, where the input text is transformed to a vector representation, + or tokenization of text the using using HuggingFace tokenizers, where the output is a tokenized version of the text to be used downstream as input to a Huggingface model during training. + - ``tokenize_hf``: It tokenizes text strings with a HuggingFace tokenizer with a predefined tokenizer hosted on huggingface.co. The tokenizer_hf can use any HuggingFace LM models available in the huggingface repo. Check more information on: `huggingface autotokenizer `_ The expected input can any length of text strings, and the expected output will include ``input_ids`` for token IDs on the input text, ``attention_mask`` for a mask to avoid performing attention on padding token indices, and ``token_type_ids`` for segmenting two sentences in models. The output here is compatible for graphstorm language model training and inference pipelines. - - ``bert_model`` (String, required): It should be the identifier of a pre-trained model available in the Hugging Face Model Hub. - - ``max_seq_length`` (Integer, required): It specifies the maximum number of tokens of the input. + + - ``embedding_hf``: It encodes text strings with a HuggingFace model hosted on huggingface.co. The value can be any HuggingFace language model available in the + `Huggingface model repository `_, e.g. `bert-base-uncased`. + The expected input can any length of text strings, and the expected output will be the embeddings for the text strings. + - ``hf_model`` (String, required): It should be the identifier of a pre-trained model available in the Hugging Face Model Hub. + Check the model list on `Huggingface model repository `_. + - ``max_seq_length`` (Integer, required): It specifies the maximum number of tokens of the input. Use a length greater than the dataset's longest sentence; if not, choose 128. -------------- diff --git a/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu b/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu index a99247dece..b256975673 100644 --- a/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.2/emr-serverless/Dockerfile.cpu @@ -55,6 +55,7 @@ RUN if [ -z "${MODEL}" ]; then \ else \ echo "Installing model cache for $MODEL" && \ python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \ + python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \ fi FROM runtime AS prod diff --git a/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu index a41a49f202..ae9f873762 100644 --- a/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.2/sagemaker/Dockerfile.cpu @@ -27,6 +27,7 @@ RUN pipenv install \ setuptools \ transformers==4.37.1 \ spacy==3.6.0 \ + torch==2.1.0 \ wheel \ && rm -rf /root/.cache # Do a pipenv sync so our base libs are independent from our editable code, making them cacheable @@ -52,6 +53,7 @@ RUN if [ -z "${MODEL}" ]; then \ else \ echo "Installing model cache for $MODEL" && \ python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \ + python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \ fi # Starts framework diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index 8df036a68a..c49c14dfc0 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -139,7 +139,14 @@ def _convert_feature(feats: list[dict]) -> list[dict]: gsp_transformation_dict["name"] = "huggingface" gsp_transformation_dict["kwargs"] = { "action": "tokenize_hf", - "bert_model": gconstruct_transform_dict["bert_model"], + "hf_model": gconstruct_transform_dict["bert_model"], + "max_seq_length": gconstruct_transform_dict["max_seq_length"], + } + elif gconstruct_transform_dict["name"] == "bert_hf": + gsp_transformation_dict["name"] = "huggingface" + gsp_transformation_dict["kwargs"] = { + "action": "embedding_hf", + "hf_model": gconstruct_transform_dict["bert_model"], "max_seq_length": gconstruct_transform_dict["max_seq_length"], } # TODO: Add support for other common transformations here diff --git a/graphstorm-processing/graphstorm_processing/config/hf_configs.py b/graphstorm-processing/graphstorm_processing/config/hf_configs.py index 77cbd13f17..f0347c59b2 100644 --- a/graphstorm-processing/graphstorm_processing/config/hf_configs.py +++ b/graphstorm-processing/graphstorm_processing/config/hf_configs.py @@ -16,7 +16,7 @@ from typing import Mapping -from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE +from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE, HUGGINGFACE_EMB from .feature_config_base import FeatureConfig @@ -26,9 +26,9 @@ class HFConfig(FeatureConfig): Supported kwargs ---------------- action: str, required - The type of huggingface action to use. Valid values is "tokenize_hf" - bert_model: str, required - The name of the lm model. + The type of huggingface action to use. Valid values are ["tokenize_hf", "embedding_hf"]. + hf_model: str, required + The name of the huggingface lm model. max_seq_length: int, required The maximal length of the tokenization results. """ @@ -36,7 +36,7 @@ class HFConfig(FeatureConfig): def __init__(self, config: Mapping): super().__init__(config) self.action = self._transformation_kwargs.get("action") - self.bert_model = self._transformation_kwargs.get("bert_model") + self.hf_model = self._transformation_kwargs.get("hf_model") self.max_seq_length = self._transformation_kwargs.get("max_seq_length") self._sanity_check() @@ -44,11 +44,12 @@ def __init__(self, config: Mapping): def _sanity_check(self) -> None: super()._sanity_check() assert self.action in [ - HUGGINGFACE_TOKENIZE - ], f"huggingface action needs to be {HUGGINGFACE_TOKENIZE}" + HUGGINGFACE_TOKENIZE, + HUGGINGFACE_EMB, + ], f"huggingface action needs to be one of {HUGGINGFACE_TOKENIZE, HUGGINGFACE_EMB}" assert isinstance( - self.bert_model, str - ), f"Expect bert_model to be a string, but got {self.bert_model}" + self.hf_model, str + ), f"Expect hf_model to be a string, but got {self.hf_model}" assert ( isinstance(self.max_seq_length, int) and self.max_seq_length > 0 ), f"Expect max_seq_length {self.max_seq_length} be an integer and larger than zero." diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index f848d41a50..f849c9c29f 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -48,3 +48,4 @@ ################# Bert transformations ################ HUGGINGFACE_TRANFORM = "huggingface" HUGGINGFACE_TOKENIZE = "tokenize_hf" +HUGGINGFACE_EMB = "embedding_hf" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py index 20ddecd1f0..cf53d67993 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_hf_transformation.py @@ -14,19 +14,22 @@ limitations under the License. """ +import logging +import os from typing import Sequence import numpy as np +import torch as th from pyspark.sql import DataFrame -from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField +from pyspark.sql.types import ArrayType, IntegerType, FloatType, StructType, StructField from pyspark.sql.functions import udf -from transformers import AutoTokenizer +from transformers import AutoTokenizer, AutoModel, AutoConfig -from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE +from graphstorm_processing.constants import HUGGINGFACE_TOKENIZE, HUGGINGFACE_EMB from .base_dist_transformation import DistributedTransformation def apply_transform( - cols: Sequence[str], action: str, bert_model: str, max_seq_length: int, input_df: DataFrame + cols: Sequence[str], action: str, hf_model: str, max_seq_length: int, input_df: DataFrame ) -> DataFrame: """Applies a single normalizer to the imputed dataframe, individually to each of the columns provided in the cols argument. @@ -36,8 +39,8 @@ def apply_transform( cols : Sequence[str] List of column names to apply normalization to. action : str - The type of normalization to use. Currently we only accept the `tokenize_hf` action. - bert_model : str + The type of normalization to use. Valid values are ["tokenize_hf", "embedding_hf"]. + hf_model : str The name of huggingface model. max_seq_length: int The maximal length of the tokenization results. @@ -47,8 +50,12 @@ def apply_transform( if action == HUGGINGFACE_TOKENIZE: # Initialize the tokenizer - tokenizer = AutoTokenizer.from_pretrained(bert_model) - + tokenizer = AutoTokenizer.from_pretrained(hf_model) + if max_seq_length > tokenizer.model_max_length: + raise RuntimeError( + f"max_seq_length {max_seq_length} is larger " + f"than expected {tokenizer.model_max_length}" + ) # Define the schema of your return type schema = StructType( [ @@ -89,6 +96,60 @@ def tokenize(text): transformed_df[cols[0]].getItem("attention_mask").alias("attention_mask"), transformed_df[cols[0]].getItem("token_type_ids").alias("token_type_ids"), ) + elif action == HUGGINGFACE_EMB: + # Define the schema of your return type + schema = ArrayType(FloatType()) + + if th.cuda.is_available(): + gpu = ( + int(os.environ["CUDA_VISIBLE_DEVICES"]) + if "CUDA_VISIBLE_DEVICES" in os.environ + else 0 + ) + device = f"cuda:{gpu}" + else: + device = "cpu" + logging.warning("The device to run huggingface transformation is %s", device) + tokenizer = AutoTokenizer.from_pretrained(hf_model) + if max_seq_length > tokenizer.model_max_length: + raise RuntimeError( + f"max_seq_length {max_seq_length} is larger " + f"than expected {tokenizer.model_max_length}" + ) + config = AutoConfig.from_pretrained(hf_model) + lm_model = AutoModel.from_pretrained(hf_model, config) + lm_model.eval() + lm_model = lm_model.to(device) + + # Define UDF + @udf(returnType=schema) + def lm_emb(text): + # Check if text is a string + if not isinstance(text, str): + raise ValueError("The input of the tokenizer has to be a string.") + + # Tokenize the text + outputs = tokenizer( + text, + max_length=max_seq_length, + truncation=True, + padding="max_length", + return_tensors="pt", + ) + token_type_ids = outputs.get("token_type_ids") + if token_type_ids is None: + token_type_ids = torch.zeros_like(outputs["input_ids"], dtype=torch.int8) + with th.no_grad(): + lm_outputs = lm_model( + input_ids=outputs["input_ids"].to(device), + attention_mask=outputs["attention_mask"].to(device).long(), + token_type_ids=token_type_ids.to(device).long(), + ) + embeddings = lm_outputs.pooler_output.cpu().squeeze().numpy() + return embeddings.tolist() + + # Apply the UDF to the DataFrame + transformed_df = input_df.select(lm_emb(input_df[cols[0]]).alias(cols[0])) else: raise ValueError(f"The input action needs to be {HUGGINGFACE_TOKENIZE}") @@ -103,26 +164,26 @@ class DistHFTransformation(DistributedTransformation): cols : Sequence[str] List of column names to apply normalization to. action : str - The type of huggingface action to use. Valid values is "tokenize" - bert_model: str, required + The type of huggingface action to use. Valid values are ["tokenize_hf", "embedding_hf"]. + hf_model: str, required The name of the lm model. max_seq_length: int, required The maximal length of the tokenization results. """ def __init__( - self, cols: Sequence[str], action: str, bert_model: str, max_seq_length: int + self, cols: Sequence[str], action: str, hf_model: str, max_seq_length: int ) -> None: super().__init__(cols) self.cols = cols assert len(self.cols) == 1, "Huggingface transformation only supports single column" self.action = action - self.bert_model = bert_model + self.hf_model = hf_model self.max_seq_length = max_seq_length def apply(self, input_df: DataFrame) -> DataFrame: transformed_df = apply_transform( - self.cols, self.action, self.bert_model, self.max_seq_length, input_df + self.cols, self.action, self.hf_model, self.max_seq_length, input_df ) return transformed_df diff --git a/graphstorm-processing/pyproject.toml b/graphstorm-processing/pyproject.toml index 165706aa97..bf0955f0dc 100644 --- a/graphstorm-processing/pyproject.toml +++ b/graphstorm-processing/pyproject.toml @@ -12,7 +12,6 @@ authors = [ python = "~3.9.12" pyspark = ">=3.3.0, < 3.5.0" pyarrow = "~13.0.0" -spacy = "3.6.0" boto3 = "~1.28.1" joblib = "^1.3.1" pandas = "^1.3.5" @@ -20,6 +19,12 @@ psutil = "^5.9.5" sagemaker = "^2.83.0" scipy = "^1.10.1" transformers = "^4.37.1" +torch = [ + { url = "https://download.pytorch.org/whl/cpu/torch-2.1.2%2Bcpu-cp39-cp39-linux_x86_64.whl", markers = "sys_platform == 'linux' and platform_machine != 'aarch64'"}, + { url = "https://download.pytorch.org/whl/cpu/torch-2.1.2%2Bcpu-cp39-cp39-linux_x86_64.whl", markers = "sys_platform == 'darwin' and platform_machine != 'arm64'"}, + { url = "https://download.pytorch.org/whl/cpu/torch-2.1.2-cp39-none-macosx_11_0_arm64.whl", markers = "sys_platform == 'darwin' and platform_machine == 'arm64'"}, + { url = "https://download.pytorch.org/whl/cpu/torch-2.1.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", markers = "sys_platform == 'linux' and platform_machine == 'aarch64'"}, +] [tool.poetry.group.dev] optional = true @@ -27,7 +32,6 @@ optional = true [tool.poetry.group.dev.dependencies] pytest = ">=7.4.0" mock = ">=5.0.2" -en-core-web-lg = {url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.6.0/en_core_web_lg-3.6.0-py3-none-any.whl"} coverage = ">=7.0.0" sphinx = ">=6.0.0" mypy = ">=1.0.0" diff --git a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json index 6b7a8d5b9e..d9458ad317 100644 --- a/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json +++ b/graphstorm-processing/tests/resources/small_heterogeneous_graph/gsprocessing-config.json @@ -62,7 +62,7 @@ "name": "huggingface", "kwargs": { "action": "tokenize_hf", - "bert_model": "bert-base-uncased", + "hf_model": "bert-base-uncased", "max_seq_length":16 } } diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index a910dc2622..a4f64591ef 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -57,7 +57,7 @@ def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node node_dict["nodes"][0]["features"] = [ { "feature_col": ["paper_title"], - "transform": {"name": "bert_hf"}, + "transform": {"name": "unknown"}, } ] @@ -244,6 +244,14 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "max_seq_length": 64, }, }, + { + "feature_col": ["citation_name"], + "transform": { + "name": "bert_hf", + "bert_model": "bert", + "max_seq_length": 64, + }, + }, ], "labels": [ {"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]} @@ -334,7 +342,14 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "column": "citation_name", "transformation": { "name": "huggingface", - "kwargs": {"action": "tokenize_hf", "bert_model": "bert", "max_seq_length": 64}, + "kwargs": {"action": "tokenize_hf", "hf_model": "bert", "max_seq_length": 64}, + }, + }, + { + "column": "citation_name", + "transformation": { + "name": "huggingface", + "kwargs": {"action": "embedding_hf", "hf_model": "bert", "max_seq_length": 64}, }, }, ] diff --git a/graphstorm-processing/tests/test_dist_huggingface_transformation.py b/graphstorm-processing/tests/test_dist_huggingface_transformation.py index 0982f46cf0..b96c6161fd 100644 --- a/graphstorm-processing/tests/test_dist_huggingface_transformation.py +++ b/graphstorm-processing/tests/test_dist_huggingface_transformation.py @@ -18,7 +18,8 @@ from pyspark.sql import DataFrame, SparkSession import numpy as np from numpy.testing import assert_array_equal -from transformers import AutoTokenizer +import torch as th +from transformers import AutoTokenizer, AutoConfig, AutoModel from graphstorm_processing.data_transformations.dist_transformations import ( DistHFTransformation, @@ -37,11 +38,11 @@ def test_hf_tokenizer_example(spark: SparkSession, check_df_schema): input_df = spark.createDataFrame(data, schema=columns) # Configuration for Hugging Face tokenizer transformation - bert_model = "bert-base-uncased" + hf_model = "bert-base-uncased" max_seq_length = 8 # Initialize and apply the distributed Hugging Face tokenization transformation - hf_tokenize = DistHFTransformation(["occupation"], "tokenize_hf", bert_model, max_seq_length) + hf_tokenize = DistHFTransformation(["occupation"], "tokenize_hf", hf_model, max_seq_length) output_df = hf_tokenize.apply(input_df) assert ( len(output_df.columns) == 3 @@ -57,7 +58,7 @@ def test_hf_tokenizer_example(spark: SparkSession, check_df_schema): # Tokenize the original text data for validation original_text = [row[1] for row in data] - tokenizer = AutoTokenizer.from_pretrained(bert_model) + tokenizer = AutoTokenizer.from_pretrained(hf_model) tokenized_data = tokenizer( original_text, max_length=max_seq_length, @@ -72,3 +73,61 @@ def test_hf_tokenizer_example(spark: SparkSession, check_df_schema): assert_array_equal( row[0], expected_output[idx], err_msg=f"Row {idx} for {feature} is not equal" ) + + +def test_hf_emb_example(spark: SparkSession, check_df_schema): + # Prepare test data and DataFrame + data = [ + ("mark", "doctor", None), + ("john", "scientist", 10000), + ("tara", "engineer", 20000), + ("jen", "nurse", 10000), + ] + columns = ["name", "occupation", "salary"] + input_df = spark.createDataFrame(data, schema=columns) + + # Configuration for Hugging Face tokenizer transformation + hf_model = "bert-base-uncased" + max_seq_length = 8 + + # Initialize and apply the distributed Hugging Face tokenization transformation + hf_emb = DistHFTransformation(["occupation"], "embedding_hf", hf_model, max_seq_length) + output_df = hf_emb.apply(input_df) + + check_df_schema(output_df) + + # Collect the output data for comparison + output_data = output_df.collect() + + # Tokenize the original text data for validation + original_text = [row[1] for row in data] + tokenizer = AutoTokenizer.from_pretrained(hf_model) + config = AutoConfig.from_pretrained(hf_model) + lm_model = AutoModel.from_pretrained(hf_model, config) + lm_model.eval() + lm_model = lm_model.to("cpu") + + embeddings_list = [] + for text in original_text: + outputs = tokenizer(text, return_tensors="pt") + tokens = outputs["input_ids"] + att_masks = outputs["attention_mask"] + token_types = outputs.get("token_type_ids") + with th.no_grad(): + if token_types is not None: + outputs = lm_model( + tokens.to("cpu"), + attention_mask=att_masks.to("cpu"), + token_type_ids=token_types.to("cpu"), + ) + else: + outputs = lm_model(tokens.to("cpu"), attention_mask=att_masks.to("cpu")) + embeddings = outputs.pooler_output.cpu().squeeze().numpy() + embeddings_list.append(embeddings) + + # Compare the Spark DataFrame output with the expected tokenizer output + expected_output = embeddings_list + for idx, row in enumerate(output_data): + np.testing.assert_almost_equal( + row[0], expected_output[idx], decimal=3, err_msg=f"Row {idx} is not equal" + )