From cbfaa625cdbf205c21bcf739170414bf2ea6d3f6 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Mon, 10 Jun 2024 22:15:47 +0300 Subject: [PATCH 1/4] [GSProcessing] Add ability to re-apply pre-computed categorical transformation. --- .../dist_feature_transformer.py | 19 ++-- .../base_dist_transformation.py | 32 +++++++ .../dist_category_transformation.py | 96 +++++++++++++++++-- .../distributed_executor.py | 48 +++++----- .../dist_heterogeneous_loader.py | 59 ++++++++---- graphstorm-processing/pyproject.toml | 1 + .../test_dist_category_transformation.py | 30 +++++- .../tests/test_dist_executor.py | 76 ++++++++++++++- .../tests/test_dist_heterogenous_loader.py | 74 +++++++------- 9 files changed, 346 insertions(+), 89 deletions(-) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py index 9ef2fd0a33..b68c3eeb96 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py @@ -44,7 +44,7 @@ def __init__( feat_name = feature_config.feat_name args_dict = feature_config.transformation_kwargs self.transformation: DistributedTransformation - # TODO: We will use this to re-apply transformations + # We use this to re-apply transformations self.json_representation = json_representation default_kwargs = { @@ -63,7 +63,7 @@ def __init__( self.transformation = DistBucketNumericalTransformation(**default_kwargs, **args_dict) elif feat_type == "categorical": self.transformation = DistCategoryTransformation( - **default_kwargs, **args_dict, spark=spark + **default_kwargs, **args_dict, spark=spark, json_representation=json_representation ) elif feat_type == "multi-categorical": self.transformation = DistMultiCategoryTransformation(**default_kwargs, **args_dict) @@ -88,10 +88,17 @@ def apply_transformation(self, input_df: DataFrame) -> tuple[DataFrame, dict]: """ input_df = input_df.select(self.transformation.cols) # type: ignore - return ( - self.transformation.apply(input_df), - self.transformation.get_json_representation(), - ) + if self.json_representation: + logging.info("Applying precomputed transformation...") + return ( + self.transformation.apply_precomputed_transformation(input_df), + self.json_representation, + ) + else: + return ( + self.transformation.apply(input_df), + self.transformation.get_json_representation(), + ) def get_transformation_name(self) -> str: """ diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py index 221bdce0f7..893cf48649 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py @@ -16,6 +16,7 @@ from abc import ABC, abstractmethod from typing import Optional, Sequence +import logging from pyspark.sql import DataFrame, SparkSession @@ -23,6 +24,15 @@ class DistributedTransformation(ABC): """ Base class for all distributed transformations. + + Parameters + ---------- + cols : Sequence[str] + Column names to which we will apply the transformation + spark : Optional[SparkSession], optional + Optional SparkSession if needed by the underlying implementation, by default None + json_representation : Optional[dict], optional + Pre-computed transformation representation to use, by default None """ def __init__( @@ -52,6 +62,28 @@ def get_json_representation(self) -> dict: else: return {} + def apply_precomputed_transformation(self, input_df: DataFrame) -> DataFrame: + """Applies a transformation using pre-computed representation. + + Parameters + ---------- + input_df : DataFrame + Input DataFrame to apply the transformation to. + + Returns + ------- + DataFrame + The input DataFrame, modified according to the pre-computed transformation values. + """ + logging.warning( + ( + "Transformation %s does not support pre-existing transform" + ", applying new transformation" + ), + self.get_transformation_name(), + ) + return self.apply(input_df) + @staticmethod @abstractmethod def get_transformation_name() -> str: diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py index 8dfd5ac49f..2f372f2d3c 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py @@ -14,17 +14,20 @@ limitations under the License. """ +from collections import defaultdict from typing import List, Optional, Sequence +from functools import partial import numpy as np import pandas as pd -from pyspark.sql import DataFrame, functions as F, SparkSession -from pyspark.sql.functions import when -from pyspark.sql.types import ArrayType, FloatType, StringType from pyspark.ml.feature import StringIndexer, OneHotEncoder from pyspark.ml.functions import vector_to_array from pyspark.ml.linalg import Vectors +from pyspark.sql import DataFrame, functions as F, SparkSession +from pyspark.sql.functions import when +from pyspark.sql.types import ArrayType, FloatType, StringType +from pyspark.sql.types import IntegerType from graphstorm_processing.constants import ( MAX_CATEGORIES_PER_FEATURE, @@ -41,8 +44,12 @@ class DistCategoryTransformation(DistributedTransformation): Transforms categorical features into a vector of one-hot-encoded values. """ - def __init__(self, cols: list[str], spark: SparkSession) -> None: - super().__init__(cols, spark) + def __init__( + self, cols: list[str], spark: SparkSession, json_representation: Optional[dict] = None + ) -> None: + if not json_representation: + json_representation = {} + super().__init__(cols, spark, json_representation) @staticmethod def get_transformation_name() -> str: @@ -51,6 +58,7 @@ def get_transformation_name() -> str: def apply(self, input_df: DataFrame) -> DataFrame: processed_col_names = [] top_categories_per_col: dict[str, list] = {} + for current_col in self.cols: processed_col_names.append(current_col + "_processed") distinct_category_counts = input_df.groupBy(current_col).count() # type: DataFrame @@ -157,7 +165,7 @@ def apply(self, input_df: DataFrame) -> DataFrame: # see get_json_representation() docstring for structure self.json_representation = { - "string_indexer_labels_array": str_indexer_model.labelsArray, + "string_indexer_labels_arrays": str_indexer_model.labelsArray, "cols": self.cols, "per_col_label_to_one_hot_idx": per_col_label_to_one_hot_idx, "transformation_name": self.get_transformation_name(), @@ -165,6 +173,82 @@ def apply(self, input_df: DataFrame) -> DataFrame: return dense_vector_features + def apply_precomputed_transformation(self, input_df: DataFrame) -> DataFrame: + + # Get JSON representation of categorical transformation + labels_arrays: list[list[str]] = self.json_representation["string_indexer_labels_arrays"] + per_col_label_to_one_hot_idx: dict[str, dict[str, int]] = self.json_representation[ + "per_col_label_to_one_hot_idx" + ] + precomputed_cols: list[str] = self.json_representation["cols"] + + # Assertions to ensure correctness of representation + assert set(precomputed_cols) == set(self.cols), ( + f"Mismatched columns in precomputed transformation: " + f"pre-computed cols: {sorted(precomputed_cols)}, " + f"columns in current config: {sorted(self.cols)}" + ) + for col_labels, col in zip(labels_arrays, precomputed_cols): + for idx, label in enumerate(col_labels): + assert idx == per_col_label_to_one_hot_idx[col][label], ( + "Mismatch between Spark labelsArray and pre-computed array index " + f"for col {col}, string: {label}, " + f"{idx} != {per_col_label_to_one_hot_idx[col][label]}" + ) + + # For each column in the transformation, we create a defaultdict + # with each unique value as keys, and the one-hot vector encoding + # of the value as value. Values not in the dict get the all zeroes (missing) + # vector + # Do this for each column in the transformation and return the resulting DF + + # We need to define these outside the loop to avoid + # https://pylint.readthedocs.io/en/latest/user_guide/messages/warning/cell-var-from-loop.html + def replace_col_in_row(val: str, str_to_vec: dict): + return str_to_vec[val] + + def create_zeroes_list(vec_size: int): + return [0] * vec_size + + transformed_df = None + already_transformed_cols = [] + remaining_cols = list(self.cols) + + for col_idx, current_col in enumerate(precomputed_cols): + vector_size = len(labels_arrays[col_idx]) + # Mapping from string to one-hot vector, + # with all-zeroes default for unknown/missing values + string_to_vector = defaultdict(partial(create_zeroes_list, vector_size)) + + string_to_one_hot_idx = per_col_label_to_one_hot_idx[current_col] + + # Populate the one-hot vectors for known strings + for string_val, one_hot_idx in string_to_one_hot_idx.items(): + one_hot_vec = [0] * vector_size + one_hot_vec[one_hot_idx] = 1 + string_to_vector[string_val] = one_hot_vec + + # UDF that replaces strings values with their one-hot encoding (ohe) + replace_cur_col = partial(replace_col_in_row, str_to_vec=string_to_vector) + replace_cur_col_udf = F.udf(replace_cur_col, ArrayType(IntegerType())) + + partial_df = transformed_df if transformed_df else input_df + + transformed_col = f"{current_col}_ohe" + remaining_cols.remove(current_col) + # We maintain only the already transformed cols, and the ones yet to be transformed + transformed_df = partial_df.select( + replace_cur_col_udf(F.col(current_col)).alias(transformed_col), + *remaining_cols, + *already_transformed_cols, + ).drop(current_col) + already_transformed_cols.append(transformed_col) + + assert transformed_df + transformed_df = transformed_df.select(*already_transformed_cols).toDF(*self.cols) + + return transformed_df + def get_json_representation(self) -> dict: """Representation of the single-category transformation for one or more columns. diff --git a/graphstorm-processing/graphstorm_processing/distributed_executor.py b/graphstorm-processing/graphstorm_processing/distributed_executor.py index ffd2ee92b1..1b2d5fa88c 100644 --- a/graphstorm-processing/graphstorm_processing/distributed_executor.py +++ b/graphstorm-processing/graphstorm_processing/distributed_executor.py @@ -242,6 +242,25 @@ def __init__( # Create the Spark session for execution self.spark = spark_utils.create_spark_session(self.execution_env, self.filesystem_type) + # Initialize the graph loader + data_configs = create_config_objects(self.gsp_config_dict) + loader_config = HeterogeneousLoaderConfig( + add_reverse_edges=self.add_reverse_edges, + data_configs=data_configs, + enable_assertions=False, + graph_name=self.graph_name, + input_prefix=self.input_prefix, + local_input_path=self.local_config_path, + local_metadata_output_path=self.local_metadata_output_path, + num_output_files=self.num_output_files, + output_prefix=self.output_prefix, + precomputed_transformations=self.precomputed_transformations, + ) + self.loader = DistHeterogeneousGraphLoader( + self.spark, + loader_config, + ) + def _upload_output_files(self, loader: DistHeterogeneousGraphLoader, force=False): """Upload output files to S3 @@ -273,27 +292,10 @@ def run(self) -> None: Executes the Spark processing job. """ logging.info("Performing data processing with PySpark...") - data_configs = create_config_objects(self.gsp_config_dict) t0 = time.time() - # Prefer explicit arguments for clarity - loader_config = HeterogeneousLoaderConfig( - add_reverse_edges=self.add_reverse_edges, - data_configs=data_configs, - enable_assertions=False, - graph_name=self.graph_name, - input_prefix=self.input_prefix, - local_input_path=self.local_config_path, - local_metadata_output_path=self.local_metadata_output_path, - num_output_files=self.num_output_files, - output_prefix=self.output_prefix, - precomputed_transformations=self.precomputed_transformations, - ) - loader = DistHeterogeneousGraphLoader( - self.spark, - loader_config, - ) - processed_representations: ProcessedGraphRepresentation = loader.load() + + processed_representations: ProcessedGraphRepresentation = self.loader.load() graph_meta_dict = processed_representations.processed_graph_metadata_dict t1 = time.time() @@ -343,7 +345,9 @@ def run(self) -> None: # If any of the metadata modification took place, write an updated metadata file if updated_metadata: - updated_meta_path = os.path.join(loader.output_path, "updated_row_counts_metadata.json") + updated_meta_path = os.path.join( + self.loader.output_path, "updated_row_counts_metadata.json" + ) with open( updated_meta_path, "w", @@ -384,7 +388,7 @@ def run(self) -> None: # since we can't rely on SageMaker to do it if self.filesystem_type == FilesystemType.S3: self._upload_output_files( - loader, force=(not self.execution_env == ExecutionEnv.SAGEMAKER) + self.loader, force=(not self.execution_env == ExecutionEnv.SAGEMAKER) ) def _merge_config_with_transformations( @@ -406,7 +410,7 @@ def _merge_config_with_transformations( "node_features": { "node_type1": { "feature_name1": { - "transformation": # transformation type + "transformation_name": # transformation name, e.g. "numerical" # feature1 representation goes here }, "feature_name2": {}, ... diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index e0a1832c22..593a785c92 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -25,7 +25,6 @@ from time import perf_counter from typing import Any, Dict, Optional, Set, Tuple -from pyspark import RDD from pyspark.sql import Row, SparkSession, DataFrame, functions as F from pyspark.sql.types import ( StructType, @@ -49,16 +48,21 @@ HUGGINGFACE_TOKENIZE, TRANSFORMATIONS_FILENAME, ) -from ..config.config_parser import EdgeConfig, NodeConfig, StructureConfig -from ..config.label_config_base import LabelConfig -from ..config.feature_config_base import FeatureConfig -from ..data_transformations.dist_feature_transformer import DistFeatureTransformer -from ..data_transformations.dist_label_loader import DistLabelLoader, SplitRates, CustomSplit -from ..data_transformations import s3_utils, spark_utils +from graphstorm_processing.config.config_parser import EdgeConfig, NodeConfig, StructureConfig +from graphstorm_processing.config.label_config_base import LabelConfig +from graphstorm_processing.config.feature_config_base import FeatureConfig +from graphstorm_processing.data_transformations.dist_feature_transformer import ( + DistFeatureTransformer, +) +from graphstorm_processing.data_transformations.dist_label_loader import ( + CustomSplit, + DistLabelLoader, + SplitRates, +) +from graphstorm_processing.data_transformations import s3_utils, spark_utils -# TODO: Remove the pylint disable once we add the rest of the code -from . import schema_utils # pylint: disable=no-name-in-module -from .row_count_utils import ParquetRowCounter # pylint: disable=no-name-in-module +from . import schema_utils +from .row_count_utils import ParquetRowCounter FORMAT_NAME = "parquet" DELIMITER = "" if FORMAT_NAME == "parquet" else "," @@ -207,7 +211,10 @@ def __init__( # }, # "edges_features": {...} # } - self.transformation_representations = {"node_features": {}, "edge_features": {}} + self.transformation_representations = { + "node_features": defaultdict(dict), + "edge_features": defaultdict(dict), + } self.graph_name = loader_config.graph_name self.skip_train_masks = False self.pre_computed_transformations = loader_config.precomputed_transformations @@ -1029,16 +1036,26 @@ def _process_node_features( """ node_type_feature_metadata = {} ntype_feat_sizes = {} # type: Dict[str, int] + for feat_conf in feature_configs: - json_representation = self.transformation_representations.get(feat_conf.feat_name, {}) + json_representation = ( + self.pre_computed_transformations.get("node_features", {}) + .get(node_type, {}) + .get(feat_conf.feat_name, {}) + ) transformer = DistFeatureTransformer(feat_conf, self.spark, json_representation) + if json_representation: + logging.info( + "Will apply pre-computed transformation for feature: %s", feat_conf.feat_name + ) + transformed_feature_df, json_representation = transformer.apply_transformation(nodes_df) transformed_feature_df.cache() - # Will evaluate False for empty dict + # Will evaluate False for empty dict, only create representations when needed if json_representation: - self.transformation_representations["node_features"][ + self.transformation_representations["node_features"][node_type][ feat_conf.feat_name ] = json_representation @@ -1425,6 +1442,7 @@ def process_edge_data(self, edge_configs: Sequence[EdgeConfig]) -> Tuple[Dict, D logging.info("No features or labels for edge type: %s", edge_type) # With features or labels else: + # TODO: Add unit tests for this branch relation_col = edge_config.rel_col edge_type_metadata_dicts = {} @@ -1493,6 +1511,7 @@ def _process_edge_features( for each feature, which tells us the column size of the encoded features. """ + # TODO: Add unit tests for edge feature processing edge_feature_metadata_dicts = {} etype_feat_sizes = {} # type: Dict[str, int] for feat_conf in feature_configs: @@ -1501,14 +1520,22 @@ def _process_edge_features( feat_conf.feat_name, edge_type, ) - json_representation = self.transformation_representations.get(feat_conf.feat_name, {}) + json_representation = ( + self.pre_computed_transformations.get("edges_features", {}) + .get(edge_type, {}) + .get(feat_conf.feat_name, {}) + ) transformer = DistFeatureTransformer(feat_conf, self.spark, json_representation) + if json_representation: + logging.info( + "Will apply pre-computed transformation for feature: %s", feat_conf.feat_name + ) transformed_feature_df, json_representation = transformer.apply_transformation(edges_df) transformed_feature_df.cache() # Will evaluate False for empty dict if json_representation: - self.transformation_representations["node_features"][ + self.transformation_representations["edge_features"][edge_type][ feat_conf.feat_name ] = json_representation diff --git a/graphstorm-processing/pyproject.toml b/graphstorm-processing/pyproject.toml index dd83dcadcf..de2b8cff77 100644 --- a/graphstorm-processing/pyproject.toml +++ b/graphstorm-processing/pyproject.toml @@ -40,6 +40,7 @@ black = "~24.2.0" pre-commit = "^3.3.3" types-mock = "^5.1.0.1" pylint = "~2.17.5" +diff-cover = "^9.0.0" [project] requires-python = ">=3.9" # TODO: Do we need a tilde here? diff --git a/graphstorm-processing/tests/test_dist_category_transformation.py b/graphstorm-processing/tests/test_dist_category_transformation.py index 14b1ff99c5..0fb6b04b21 100644 --- a/graphstorm-processing/tests/test_dist_category_transformation.py +++ b/graphstorm-processing/tests/test_dist_category_transformation.py @@ -131,7 +131,7 @@ def test_multiple_single_cat_cols_json(user_df, spark): multi_cols_rep = dist_category_transformation.get_json_representation() - labels_array = multi_cols_rep["string_indexer_labels_array"] + labels_arrays = multi_cols_rep["string_indexer_labels_arrays"] one_hot_index_for_string = multi_cols_rep["per_col_label_to_one_hot_idx"] cols = multi_cols_rep["cols"] name = multi_cols_rep["transformation_name"] @@ -139,11 +139,37 @@ def test_multiple_single_cat_cols_json(user_df, spark): assert name == "DistCategoryTransformation" # The Spark-generated and our own one-hot-index mappings should match - for col_labels, col in zip(labels_array, cols): + for col_labels, col in zip(labels_arrays, cols): for idx, label in enumerate(col_labels): assert idx == one_hot_index_for_string[col][label] +def test_apply_precomputed_single_cat_cols(user_df, spark): + """Test applying precomputed transformation for single-cat columns""" + dist_category_transformation = DistCategoryTransformation(["occupation", "gender"], spark) + + original_transformed_df = dist_category_transformation.apply(user_df) + + multi_cols_rep = dist_category_transformation.get_json_representation() + + precomputed_transformation = DistCategoryTransformation( + ["occupation", "gender"], spark, multi_cols_rep + ) + + precomp_transformed_df = precomputed_transformation.apply_precomputed_transformation(user_df) + + occupation_distinct_values = original_transformed_df.select("occupation").distinct().count() + gender_distinct_values = original_transformed_df.select("gender").distinct().count() + + precomp_occupation_distinct_values = ( + precomp_transformed_df.select("occupation").distinct().count() + ) + precomp_gender_distinct_values = precomp_transformed_df.select("gender").distinct().count() + + assert occupation_distinct_values == precomp_occupation_distinct_values + assert gender_distinct_values == precomp_gender_distinct_values + + def test_multi_category_transformation(multi_cat_df_and_separator, check_df_schema): """Test transforming single multi-category column""" df, separator = multi_cat_df_and_separator diff --git a/graphstorm-processing/tests/test_dist_executor.py b/graphstorm-processing/tests/test_dist_executor.py index f99e087491..b88ac38ab6 100644 --- a/graphstorm-processing/tests/test_dist_executor.py +++ b/graphstorm-processing/tests/test_dist_executor.py @@ -14,15 +14,19 @@ limitations under the License. """ +import json import os import shutil import tempfile +from unittest import mock import pytest from graphstorm_processing.distributed_executor import DistributedExecutor, ExecutorConfig -from graphstorm_processing.constants import ExecutionEnv, FilesystemType +from graphstorm_processing.constants import TRANSFORMATIONS_FILENAME, FilesystemType, ExecutionEnv +from test_dist_heterogenous_loader import verify_integ_test_output, NODE_CLASS_GRAPHINFO_UPDATES +pytestmark = pytest.mark.usefixtures("spark") _ROOT = os.path.abspath(os.path.dirname(__file__)) @@ -36,7 +40,29 @@ def tempdir_fixture(): shutil.rmtree(tempdirectory) -def test_merge_input_and_transform_dicts(tempdir: str): +def precomp_json_file(local_input, precomp_filename): + """Copy precomputed json to local input dir""" + precomp_file = shutil.copy( + os.path.join(_ROOT, "resources", "precomputed_transformations", precomp_filename), + os.path.join(local_input, TRANSFORMATIONS_FILENAME), + ) + return precomp_file + + +@pytest.fixture(name="user_state_categorical_precomp_file") +def user_state_categorical_precomp_file_fixture(): + """Copy precomputed user->state feature transformation to local input dir""" + precomp_file = precomp_json_file( + os.path.join(_ROOT, "resources/small_heterogeneous_graph"), + "user_state_categorical_transformation.json", + ) + + yield precomp_file + + os.remove(precomp_file) + + +def test_dist_executor_run_with_precomputed(tempdir: str, user_state_categorical_precomp_file): """Test run function with local data""" input_path = os.path.join(_ROOT, "resources/small_heterogeneous_graph") executor_configuration = ExecutorConfig( @@ -53,6 +79,50 @@ def test_merge_input_and_transform_dicts(tempdir: str): do_repartition=True, ) + original_precomp_file = user_state_categorical_precomp_file + + with open(original_precomp_file, "r", encoding="utf-8") as f: + original_transformations = json.load(f) + + dist_executor = DistributedExecutor(executor_configuration) + + # Mock the SparkContext stop() function to leave the Spark context running + # for the other tests, otherwise dist_executor stops it + dist_executor.spark.stop = mock.MagicMock(name="stop") + + dist_executor.run() + + with open(os.path.join(tempdir, "metadata.json"), "r", encoding="utf-8") as mfile: + metadata = json.load(mfile) + + verify_integ_test_output(metadata, dist_executor.loader, NODE_CLASS_GRAPHINFO_UPDATES) + + with open(os.path.join(tempdir, TRANSFORMATIONS_FILENAME), "r", encoding="utf-8") as f: + reapplied_transformations = json.load(f) + + # There should be no difference between original and re-applied transformation dicts + assert reapplied_transformations == original_transformations + + # TODO: Verify other metadata files that verify_integ_test_output doesn't check for + + +def test_merge_input_and_transform_dicts(tempdir: str): + """Test the _merge_config_with_transformations function with hardcoded json data""" + input_path = os.path.join(_ROOT, "resources/small_heterogeneous_graph") + executor_configuration = ExecutorConfig( + local_config_path=input_path, + local_metadata_output_path=tempdir, + input_prefix=input_path, + output_prefix=tempdir, + num_output_files=-1, + config_filename="gsprocessing-config.json", + execution_env=ExecutionEnv.LOCAL, + filesystem_type=FilesystemType.LOCAL, + add_reverse_edges=True, + graph_name="small_heterogeneous_graph", + do_repartition=True, + ) + dist_executor = DistributedExecutor(executor_configuration) pre_comp_transormations = { @@ -71,7 +141,7 @@ def test_merge_input_and_transform_dicts(tempdir: str): pre_comp_transormations, ) - # Ensure the "user" node type's "age" feature includes a transformation entry + # Ensure the "user" node type's "state" feature includes a transformation entry for node_input_dict in input_config_with_transforms["graph"]["nodes"]: if "user" == node_input_dict["type"]: for feature in node_input_dict["features"]: diff --git a/graphstorm-processing/tests/test_dist_heterogenous_loader.py b/graphstorm-processing/tests/test_dist_heterogenous_loader.py index 068aed50bb..f63756e790 100644 --- a/graphstorm-processing/tests/test_dist_heterogenous_loader.py +++ b/graphstorm-processing/tests/test_dist_heterogenous_loader.py @@ -55,6 +55,32 @@ LABEL_COL = "label" NUM_DATAPOINTS = 10000 +NODE_CLASS_GRAPHINFO_UPDATES = { + "nfeat_size": { + "user": { + "age": 1, + "attention_mask": 16, + "input_ids": 16, + "token_type_ids": 16, + "multi": 2, + "state": 3, + } + }, + "efeat_size": {}, + "etype_label": [], + "etype_label_property": [], + "ntype_label": ["user"], + "ntype_label_property": ["gender"], + "task_type": "node_class", + "label_map": {"male": 0, "female": 1}, + "label_properties": { + "user": { + "COLUMN_NAME": "gender", + "VALUE_COUNTS": {"male": 3, "female": 1, "null": 1}, + } + }, +} + @pytest.fixture(autouse=True, name="tempdir") def tempdir_fixture(): @@ -201,9 +227,12 @@ def verify_integ_test_output( expected_node_counts = {"director": 3, "genre": 2, "movie": 4, "user": 5} # TODO: The following Parquet reads assume there's only one file in the output for node_type in metadata["node_type"]: - nrows = pq.ParquetFile( - os.path.join(loader.output_path, metadata["raw_id_mappings"][node_type]["data"][0]) - ).metadata.num_rows + nrows = pq.read_table( + os.path.join( + loader.output_path, + os.path.dirname(metadata["raw_id_mappings"][node_type]["data"][0]), + ) + ).num_rows assert nrows == expected_node_counts[node_type] expected_edge_counts = { @@ -216,9 +245,11 @@ def verify_integ_test_output( } for edge_type in metadata["edge_type"]: - nrows = pq.ParquetFile( - os.path.join(loader.output_path, metadata["edges"][edge_type]["data"][0]) - ).metadata.num_rows + nrows = pq.read_table( + os.path.join( + loader.output_path, os.path.dirname(metadata["edges"][edge_type]["data"][0]) + ) + ).num_rows assert nrows == expected_edge_counts[edge_type] shared_expected_graphinfo = { @@ -255,33 +286,7 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade ) as mfile: metadata = json.load(mfile) - graphinfo_updates = { - "nfeat_size": { - "user": { - "age": 1, - "attention_mask": 16, - "input_ids": 16, - "token_type_ids": 16, - "multi": 2, - "state": 3, - } - }, - "efeat_size": {}, - "etype_label": [], - "etype_label_property": [], - "ntype_label": ["user"], - "ntype_label_property": ["gender"], - "task_type": "node_class", - "label_map": {"male": 0, "female": 1}, - "label_properties": { - "user": { - "COLUMN_NAME": "gender", - "VALUE_COUNTS": {"male": 3, "female": 1, "null": 1}, - } - }, - } - - verify_integ_test_output(metadata, dghl_loader, graphinfo_updates) + verify_integ_test_output(metadata, dghl_loader, NODE_CLASS_GRAPHINFO_UPDATES) expected_node_data = { "user": { @@ -306,7 +311,8 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade ) as transformation_file: transformations_dict = json.load(transformation_file) - assert "state" in transformations_dict["node_features"] + assert "user" in transformations_dict["node_features"] + assert "state" in transformations_dict["node_features"]["user"] def test_load_dist_hgl_without_labels( From 225b7acbd8fa932d99c9f282483ef877addc3941 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Mon, 10 Jun 2024 22:52:01 +0000 Subject: [PATCH 2/4] Add documentation and small fixes --- .../gs-processing-getting-started.rst | 16 +++++++++ docs/source/gs-processing/usage/example.rst | 33 ++++++++++++++----- ...user_state_categorical_transformation.json | 19 +++++++++++ 3 files changed, 59 insertions(+), 9 deletions(-) create mode 100644 graphstorm-processing/tests/resources/precomputed_transformations/user_state_categorical_transformation.json diff --git a/docs/source/gs-processing/gs-processing-getting-started.rst b/docs/source/gs-processing/gs-processing-getting-started.rst index 048adc1de2..5f94b0a403 100644 --- a/docs/source/gs-processing/gs-processing-getting-started.rst +++ b/docs/source/gs-processing/gs-processing-getting-started.rst @@ -161,6 +161,22 @@ GSProcessing supports both the GConstruct JSON configuration format, as well as its own GSProcessing config. You can learn about the GSProcessing JSON configuration in :doc:`developer/input-configuration`. +Re-applying feature transformations to new data +----------------------------------------------- + +Often you will process your data at training time and run inference at later +dates. If your data changes in the meantime. e.g. new values appear in a +categorical feature, you'll need to ensure no new values appear in the transformed +data, as the trained model relies on pre-existing values only. + +To achieve that, GSProcessing creates an additional file in the output, +named ``precomputed_transformations.json``. To ensure the same transformations +are applied to new data, you can copy this file to the top-level path of your +new input data, and GSProcessing will pick up any transformations there to ensure +the produced data match the ones that were used to train your model. + +Currently, we only support re-applying transformations for categorical features. + Developer guide --------------- diff --git a/docs/source/gs-processing/usage/example.rst b/docs/source/gs-processing/usage/example.rst index 076b9e75a4..a7c502a0ed 100644 --- a/docs/source/gs-processing/usage/example.rst +++ b/docs/source/gs-processing/usage/example.rst @@ -173,7 +173,7 @@ we can use the following command to run the processing job locally: .. code-block:: bash - gs-processing --config-filename gconstruct-config.json \ + gs-processing --config-filename gsprocessing-config.json \ --input-prefix ./tests/resources/small_heterogeneous_graph \ --output-prefix /tmp/gsprocessing-example/ \ --do-repartition True @@ -211,26 +211,41 @@ and can be used downstream to create a partitioned graph. .. code-block:: bash $ cd /tmp/gsprocessing-example - $ ls - - edges/ launch_arguments.json metadata.json node_data/ - raw_id_mappings/ perf_counters.json updated_row_counts_metadata.json + $ ls -l + + edges/ + gconstruct-config_with_transformations.json + launch_arguments.json + metadata.json + node_data/ + perf_counters.json + precomputed_transformations.json + raw_id_mappings/ + updated_row_counts_metadata.json We have a few JSON files and the data directories containing the graph structure, features, and labels. In more detail: +* ``gsprocessing-config_with_transformations.json``: This is the input configuration + we used, modified to include representations of any supported transformations + we applied. This file can be used to re-apply the transformations on new data. * ``launch_arguments.json``: Contains the arguments that were used to launch the processing job, allowing you to check the parameters after the job finishes. -* ``updated_row_counts_metadata.json``: - This file is meant to be used as the input configuration for the - distributed partitioning pipeline. ``gs-repartition`` produces - this file using the original ``metadata.json`` file as input. * ``metadata.json``: Created by ``gs-processing`` and used as input for ``gs-repartition``, can be removed the ``gs-repartition`` step. * ``perf_counters.json``: A JSON file that contains runtime measurements for the various components of GSProcessing. Can be used to profile the application and discover bottlenecks. +* ``precomputed_transformations.json``: A JSON file that contains representations + of supported transformations. This file can be copied to the input path of another + set of raw files, and GSProcessing will use the transformation values listed here + instead of creating new ones. Use this to re-apply the same transformations to new + data. +* ``updated_row_counts_metadata.json``: + This file is meant to be used as the input configuration for the + distributed partitioning pipeline. ``gs-repartition`` produces + this file using the original ``metadata.json`` file as input. The directories created contain: diff --git a/graphstorm-processing/tests/resources/precomputed_transformations/user_state_categorical_transformation.json b/graphstorm-processing/tests/resources/precomputed_transformations/user_state_categorical_transformation.json new file mode 100644 index 0000000000..e2b665d2d7 --- /dev/null +++ b/graphstorm-processing/tests/resources/precomputed_transformations/user_state_categorical_transformation.json @@ -0,0 +1,19 @@ +{ + "node_features": { + "user": { + "state": { + "transformation_name": "DistCategoryTransformation", + "string_indexer_labels_arrays": [["wa", "ca", "ny"]], + "cols": ["state"], + "per_col_label_to_one_hot_idx": { + "state": { + "wa": 0, + "ca": 1, + "ny": 2 + } + } + } + } + }, + "edge_features": {} +} \ No newline at end of file From 5109f7682aa6c03fe87710d13949ba4fa33489f1 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Thu, 13 Jun 2024 19:15:12 +0000 Subject: [PATCH 3/4] Fix doc and add col difference printing for cat re-apply --- docs/source/gs-processing/usage/example.rst | 2 +- .../dist_transformations/dist_category_transformation.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/source/gs-processing/usage/example.rst b/docs/source/gs-processing/usage/example.rst index a7c502a0ed..cc44bcd57b 100644 --- a/docs/source/gs-processing/usage/example.rst +++ b/docs/source/gs-processing/usage/example.rst @@ -214,7 +214,7 @@ and can be used downstream to create a partitioned graph. $ ls -l edges/ - gconstruct-config_with_transformations.json + gsprocessing-config_with_transformations.json launch_arguments.json metadata.json node_data/ diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py index 2f372f2d3c..1a73fd1dc2 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py @@ -186,7 +186,8 @@ def apply_precomputed_transformation(self, input_df: DataFrame) -> DataFrame: assert set(precomputed_cols) == set(self.cols), ( f"Mismatched columns in precomputed transformation: " f"pre-computed cols: {sorted(precomputed_cols)}, " - f"columns in current config: {sorted(self.cols)}" + f"columns in current config: {sorted(self.cols)}, " + f"different items: {set(precomputed_cols).symmetric_difference(set(self.cols))}" ) for col_labels, col in zip(labels_arrays, precomputed_cols): for idx, label in enumerate(col_labels): From a276a44ad09c708a198ba602299e126208d3ecfd Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Fri, 14 Jun 2024 22:23:09 +0000 Subject: [PATCH 4/4] Apply review comments --- docs/source/gs-processing/usage/example.rst | 11 +++++++---- .../base_dist_transformation.py | 14 ++++++-------- .../dist_category_transformation.py | 6 +++++- .../graph_loaders/dist_heterogeneous_loader.py | 3 +++ 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/docs/source/gs-processing/usage/example.rst b/docs/source/gs-processing/usage/example.rst index cc44bcd57b..70f5bc8bac 100644 --- a/docs/source/gs-processing/usage/example.rst +++ b/docs/source/gs-processing/usage/example.rst @@ -238,10 +238,13 @@ the graph structure, features, and labels. In more detail: for the various components of GSProcessing. Can be used to profile the application and discover bottlenecks. * ``precomputed_transformations.json``: A JSON file that contains representations - of supported transformations. This file can be copied to the input path of another - set of raw files, and GSProcessing will use the transformation values listed here - instead of creating new ones. Use this to re-apply the same transformations to new - data. + of supported transformations. To re-use these transformations on another dataset, + place this file in the top level of another set of raw data, at the same level + as the input GSProcessing/GConstruct configuration JSON file. + GSProcessing will use the transformation values listed here + instead of creating new ones, ensuring that models trained with the original + data can still be used in the newly transformed data. Currently only + categorical transformations can be re-applied. * ``updated_row_counts_metadata.json``: This file is meant to be used as the input configuration for the distributed partitioning pipeline. ``gs-repartition`` produces diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py index 893cf48649..d5c2fa46ad 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/base_dist_transformation.py @@ -16,7 +16,6 @@ from abc import ABC, abstractmethod from typing import Optional, Sequence -import logging from pyspark.sql import DataFrame, SparkSession @@ -74,15 +73,14 @@ def apply_precomputed_transformation(self, input_df: DataFrame) -> DataFrame: ------- DataFrame The input DataFrame, modified according to the pre-computed transformation values. + Raises + ------ + NotImplementedError + If the underlying transformation does not support re-applying using JSON input. """ - logging.warning( - ( - "Transformation %s does not support pre-existing transform" - ", applying new transformation" - ), - self.get_transformation_name(), + raise NotImplementedError( + f"Pre-computed transformation not available for {self.get_transformation_name()}" ) - return self.apply(input_df) @staticmethod @abstractmethod diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py index 1a73fd1dc2..1eae9b600b 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/dist_category_transformation.py @@ -175,11 +175,15 @@ def apply(self, input_df: DataFrame) -> DataFrame: def apply_precomputed_transformation(self, input_df: DataFrame) -> DataFrame: - # Get JSON representation of categorical transformation + # List of StringIndexerModel labelsArray lists, each one containing the strings + # for one column. See docs for pyspark.ml.feature.StringIndexerModel.labelsArray labels_arrays: list[list[str]] = self.json_representation["string_indexer_labels_arrays"] + # More verbose representation of the mapping from string to one hot index location, + # for each column in the input. per_col_label_to_one_hot_idx: dict[str, dict[str, int]] = self.json_representation[ "per_col_label_to_one_hot_idx" ] + # The list of cols the transformation was originally applied to. precomputed_cols: list[str] = self.json_representation["cols"] # Assertions to ensure correctness of representation diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index 593a785c92..a3ac726deb 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -1038,6 +1038,9 @@ def _process_node_features( ntype_feat_sizes = {} # type: Dict[str, int] for feat_conf in feature_configs: + # This will get a value iff there exists a pre-computed representation + # for this feature name and node type, an empty dict (which evaluates to False) + # otherwise. We do the same for the edges. json_representation = ( self.pre_computed_transformations.get("node_features", {}) .get(node_type, {})