diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index e6e2d7ae98..17045bd450 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -12,8 +12,8 @@ between other config formats, such as the one used by the single-machine GConstruct module. GSProcessing can take a GConstruct-formatted file -directly, and we also provide `a script ` -that can convert a `GConstruct ` +directly, and we also provide `a script `_ +that can convert a `GConstruct `_ input configuration file into the ``GSProcessing`` format, although this is mostly aimed at developers, users are can rely on the automatic conversion. @@ -30,11 +30,11 @@ The GSProcessing input data configuration has two top-level objects: - ``version`` (String, required): The version of configuration file being used. We include the package name to allow self-contained identification of the file format. - ``graph`` (JSON object, required): one configuration object that defines each - of the node types and edge types that describe the graph. + of the edge and node types that constitute the graph. We describe the ``graph`` object next. -``graph`` configuration object +Contents of the ``graph`` configuration object ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ``graph`` configuration object can have two top-level objects: @@ -135,13 +135,12 @@ objects: ``source`` key, with a JSON object that contains ``{“column: String, and ”type“: String}``. - ``relation``: (JSON object, required): Describes the relation - modeled by the edges. A relation can be common among all edges, or it - can have sub-types. The top-level objects for the object are: + modeled by the edges. The top-level keys for the object are: - ``type`` (String, required): The type of the relation described by the edges. For example, for a source type ``user``, destination - ``movie`` we can have a relation type ``interacted_with`` for an - edge type ``user:interacted_with:movie``. + ``movie`` we can have a relation type ``rated`` for an + edge type ``user:rated:movie``. - ``labels`` (List of JSON objects, optional): Describes the label for the current edge type. The label object has the following @@ -171,9 +170,9 @@ objects: - ``train``: The percentage of the data with available labels to assign to the train set (0.0, 1.0]. - ``val``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the validation set [0.0, 1.0). - ``test``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the test set [0.0, 1.0). - ``features`` (List of JSON objects, optional)\ **:** Describes the set of features for the current edge type. See the :ref:`features-object` section for details. @@ -248,12 +247,12 @@ following top-level keys: - ``train``: The percentage of the data with available labels to assign to the train set (0.0, 1.0]. - ``val``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the validation set [0.0, 1.0). - ``test``: The percentage of the data with available labels to - assign to the train set [0.0, 1.0). + assign to the test set [0.0, 1.0). - ``features`` (List of JSON objects, optional): Describes - the set of features for the current edge type. See the next section, :ref:`features-object` + the set of features for the current edge node. See the next section, :ref:`features-object` for details. -------------- @@ -285,7 +284,7 @@ can contain the following top-level keys: } - ``column`` (String, required): The column that contains the raw - feature values in the dataset + feature values in the data. - ``transformation`` (JSON object, optional): The type of transformation that will be applied to the feature. For details on the individual transformations supported see :ref:`supported-transformations`. @@ -309,7 +308,7 @@ can contain the following top-level keys: # Example node config with multiple features { - # This is where the node structure data exist just need an id col + # This is where the node structure data exist, just need an id col in these files "data": { "format": "parquet", "files": ["path/to/node_ids"] @@ -356,7 +355,7 @@ Supported transformations In this section we'll describe the transformations we support. The name of the transformation is the value that would appear -in the ``transform['name']`` element of the feature configuration, +in the ``['transformation']['name']`` element of the feature configuration, with the attached ``kwargs`` for the transformations that support arguments. @@ -373,6 +372,35 @@ arguments. split the values in the column and create a vector column output. Example: for a separator ``'|'`` the CSV value ``1|2|3`` would be transformed to a vector, ``[1, 2, 3]``. +- ``numerical`` + + - Transforms a numerical column using a missing data imputer and an + optional normalizer. + - ``kwargs``: + + - ``imputer`` (String, optional): A method to fill in missing values in the data. + Valid values are: + ``mean`` (Default), ``median``, and ``most_frequent``. Missing values will be replaced + with the respective value computed from the data. + - ``normalizer`` (String, optional): Applies a normalization to the data, after + imputation. Can take the following values: + - ``none``: (Default) Don't normalize the numerical values during encoding. + - ``min-max``: Normalize each value by subtracting the minimum value from it, + and then dividing it by the difference between the maximum value and the minimum. + - ``standard``: Normalize each value by dividing it by the sum of all the values. +- ``multi-numerical`` + + - Column-wise transformation for vector-like numerical data using a missing data imputer and an + optional normalizer. + - ``kwargs``: + + - ``imputer`` (String, optional): Same as for ``numerical`` transformation, will + apply the ``mean`` transformation by default. + - ``normalizer`` (String, optional): Same as for ``numerical`` transformation, no + normalization is applied by default. + - ``separator`` (String, optional): Same as for ``no-op`` transformation, used to separate numerical + values in CSV input. If the input data are in Parquet format, each value in the + column is assumed to be an array of floats. -------------- @@ -403,6 +431,8 @@ OAG-Paper dataset ], "nodes" : [ { + "type": "paper", + "column": "ID", "data": { "format": "csv", "separator": ",", @@ -410,8 +440,17 @@ OAG-Paper dataset "node_feat.csv" ] }, - "type": "paper", - "column": "ID", + "features": [ + { + "column": "n_citation", + "transformation": { + "name": "numerical", + "kwargs": { + "imputer": "mean", + "normalizer": "min-max" + } + } + ] "labels": [ { "column": "field", diff --git a/docs/source/index.rst b/docs/source/index.rst index c3af321dea..eb55c766c4 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -23,6 +23,7 @@ Welcome to the GraphStorm Documentation and Tutorials gs-processing/usage/example gs-processing/usage/distributed-processing-setup gs-processing/usage/amazon-sagemaker + gs-processing/developer/input-configuration .. toctree:: :maxdepth: 1 diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py index 8ac9a4bfa1..dd67dbc049 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/converter_base.py @@ -89,8 +89,7 @@ def convert_to_gsprocessing(self, input_dictionary: dict) -> dict: gsprocessing_dict: dict[str, Any] = {} - # hardcode the version number for the first version - gsprocessing_dict["version"] = "gsprocessing-1.0" + gsprocessing_dict["version"] = "gsprocessing-v1.0" gsprocessing_dict["graph"] = {} # deal with nodes diff --git a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py index 341037748d..56d5edeea0 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py +++ b/graphstorm-processing/graphstorm_processing/config/config_conversion/gconstruct_converter.py @@ -13,6 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. """ +from typing import Any + from .converter_base import ConfigConverter from .meta_configuration import NodeConfig, EdgeConfig @@ -80,27 +82,40 @@ def _convert_feature(feats: list[dict]) -> list[dict]: list[dict] The feature information in the GSProcessing format """ - feats_list = [] + gsp_feats_list = [] if feats in [[], [{}]]: return [] - for ele in feats: - if "transform" in ele: - raise ValueError( - "Currently only support no-op operation, " - "we do not support any other no-op operation" - ) - feat_dict = {} - kwargs = {"name": "no-op"} - for col in ele["feature_col"]: - feat_dict = {"column": col, "transform": kwargs} - feats_list.append(feat_dict) - if "out_dtype" in ele: + for gconstruct_feat_dict in feats: + gsp_feat_dict = {} + gsp_feat_dict["column"] = gconstruct_feat_dict["feature_col"][0] + if "feature_name" in gconstruct_feat_dict: + gsp_feat_dict["name"] = gconstruct_feat_dict["feature_name"] + + gsp_transformation_dict: dict[str, Any] = {} + if "transform" in gconstruct_feat_dict: + gconstruct_transform_dict = gconstruct_feat_dict["transform"] + + if gconstruct_transform_dict["name"] == "max_min_norm": + gsp_transformation_dict["name"] = "numerical" + gsp_transformation_dict["kwargs"] = {"normalizer": "min-max", "imputer": "mean"} + # TODO: Add support for other common transformations here + else: + raise ValueError( + "Unsupported GConstruct transformation name: " + f"{gconstruct_transform_dict['name']}" + ) + else: + gsp_transformation_dict["name"] = "no-op" + + if "out_dtype" in gconstruct_feat_dict: assert ( - ele["out_dtype"] == "float32" + gconstruct_feat_dict["out_dtype"] == "float32" ), "GSProcessing currently only supports float32 features" - if "feature_name" in ele: - feat_dict["name"] = ele["feature_name"] - return feats_list + + gsp_feat_dict["transformation"] = gsp_transformation_dict + gsp_feats_list.append(gsp_feat_dict) + + return gsp_feats_list @staticmethod def convert_nodes(nodes_entries): diff --git a/graphstorm-processing/graphstorm_processing/config/config_parser.py b/graphstorm-processing/graphstorm_processing/config/config_parser.py index ae0d4f69e7..4a03ef515d 100644 --- a/graphstorm-processing/graphstorm_processing/config/config_parser.py +++ b/graphstorm-processing/graphstorm_processing/config/config_parser.py @@ -16,12 +16,13 @@ Configuration parsing for edges and nodes """ from abc import ABC - from typing import Any, Dict, List, Optional, Sequence +import logging from graphstorm_processing.constants import SUPPORTED_FILE_TYPES from .label_config_base import LabelConfig, EdgeLabelConfig, NodeLabelConfig from .feature_config_base import FeatureConfig, NoopFeatureConfig +from .numerical_configs import MultiNumericalFeatureConfig, NumericalFeatureConfig from .data_config_base import DataStorageConfig @@ -49,9 +50,14 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig: return NoopFeatureConfig(feature_dict) transformation_name = feature_dict["transformation"]["name"] + logging.info("Transformation name: %s", transformation_name) if transformation_name == "no-op": return NoopFeatureConfig(feature_dict) + elif transformation_name == "numerical": + return NumericalFeatureConfig(feature_dict) + elif transformation_name == "multi-numerical": + return MultiNumericalFeatureConfig(feature_dict) else: raise RuntimeError(f"Unknown transformation name: '{transformation_name}'") diff --git a/graphstorm-processing/graphstorm_processing/config/feature_config_base.py b/graphstorm-processing/graphstorm_processing/config/feature_config_base.py index d0b4bc28da..85e447bd7b 100644 --- a/graphstorm-processing/graphstorm_processing/config/feature_config_base.py +++ b/graphstorm-processing/graphstorm_processing/config/feature_config_base.py @@ -82,8 +82,13 @@ def _sanity_check(self) -> None: class NoopFeatureConfig(FeatureConfig): - """ - Feature configuration for features that do not need to be transformed. + """Feature configuration for features that do not need to be transformed. + + Supported kwargs + ---------------- + separator: str + When provided will treat the input as strings, split each value in the string using + the separator, and convert the resulting list of floats into a float-vector feature. """ def __init__(self, config: Mapping): @@ -98,4 +103,4 @@ def __init__(self, config: Mapping): def _sanity_check(self) -> None: super()._sanity_check() if self._data_config and self.value_separator and self._data_config.format != "csv": - raise RuntimeError("value_separator should only be provided for CSV data") + raise RuntimeError("separator should only be provided for CSV data") diff --git a/graphstorm-processing/graphstorm_processing/constants.py b/graphstorm-processing/graphstorm_processing/constants.py index 510d8a06bc..80c12347f5 100644 --- a/graphstorm-processing/graphstorm_processing/constants.py +++ b/graphstorm-processing/graphstorm_processing/constants.py @@ -19,9 +19,6 @@ MISSING_CATEGORY = "GSP_CONSTANT_UNKNOWN" SINGLE_CATEGORY_COL = "SINGLE_CATEGORY" -################ Multi-numerical Limits ##################### -MAX_COLUMNS_TO_IMPUTE = 50 - SUPPORTED_FILE_TYPES = ["csv", "parquet"] ################### Label Properties ######################## diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py index 3753823aec..3e2afd517e 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_feature_transformer.py @@ -13,10 +13,17 @@ See the License for the specific language governing permissions and limitations under the License. """ +import logging + from pyspark.sql import DataFrame from graphstorm_processing.config.feature_config_base import FeatureConfig -from .dist_transformations import DistributedTransformation, NoopTransformation +from .dist_transformations import ( + DistributedTransformation, + NoopTransformation, + DistNumericalTransformation, + DistMultiNumericalTransformation, +) class DistFeatureTransformer(object): @@ -32,9 +39,15 @@ def __init__(self, feature_config: FeatureConfig): self.transformation: DistributedTransformation default_kwargs = {"cols": feature_config.cols} + logging.info("Feature name: %s", feat_name) + logging.info("Transformation type: %s", feat_type) if feat_type == "no-op": self.transformation = NoopTransformation(**default_kwargs, **args_dict) + elif feat_type == "numerical": + self.transformation = DistNumericalTransformation(**default_kwargs, **args_dict) + elif feat_type == "multi-numerical": + self.transformation = DistMultiNumericalTransformation(**default_kwargs, **args_dict) else: raise NotImplementedError( f"Feature {feat_name} has type: {feat_type} that is not supported" diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py index 15de48dbd9..3163b65555 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/dist_transformations/__init__.py @@ -8,3 +8,7 @@ ) from .dist_noop_transformation import NoopTransformation from .dist_label_transformation import DistSingleLabelTransformation, DistMultiLabelTransformation +from .dist_numerical_transformation import ( + DistMultiNumericalTransformation, + DistNumericalTransformation, +) diff --git a/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py b/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py index 600ee37304..4cc60a43fa 100644 --- a/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py +++ b/graphstorm-processing/graphstorm_processing/data_transformations/spark_utils.py @@ -10,11 +10,11 @@ """ import logging import uuid -from typing import Tuple +from typing import Tuple, Sequence import psutil -from pyspark.sql import SparkSession, DataFrame +from pyspark.sql import SparkSession, DataFrame, functions as F from graphstorm_processing import constants try: @@ -172,8 +172,8 @@ def safe_rename_column( Returns ------- - DataFrame - The modified dataframe. + tuple[DataFrame, str] + The modified dataframe and the new column name. """ if old_colum_name in dataframe.columns: if new_column_name in dataframe.columns: @@ -190,3 +190,31 @@ def safe_rename_column( else: logging.warning("Column %s not found in dataframe. Skipping renaming.", old_colum_name) return dataframe, new_column_name + + +def rename_multiple_cols( + df: DataFrame, old_cols: Sequence[str], new_cols: Sequence[str] +) -> Tuple[DataFrame, Sequence[str]]: + """Safely renames multiple columns at once. All columns not listed in the passed args are left as is. + + Parameters + ---------- + df : DataFrame + Input DataFrame + old_cols : Sequence[str] + List of column names to change + new_cols : Sequence[str] + List of new column names. + + Returns + ------- + Tuple[DataFrame, Sequence[str]] + DataFrame with renamed columns, and a list of the new column names. + """ + assert len(old_cols) == len(new_cols) + safe_new_cols = [] + for old_name, new_name in zip(old_cols, new_cols): + _, safe_new_name = safe_rename_column(df, old_name, new_name) + safe_new_cols.append(safe_new_name) + mapping = dict(zip(old_cols, safe_new_cols)) + return df.select([F.col(c).alias(mapping.get(c, c)) for c in df.columns]), safe_new_cols diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py index 2fb37963bb..a37ea81606 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/schema_utils.py @@ -92,11 +92,9 @@ def determine_spark_feature_type(feature_type: str) -> Type[DataType]: In case an unsupported feature_type is provided. """ # TODO: Replace with pattern matching after moving to Python 3.10? - if feature_type in [ - "no-op", - ] or feature_type.startswith("text"): + if feature_type in ["no-op", "multi-numerical"] or feature_type.startswith("text"): return StringType - if feature_type in ["numerical", "bucket_numerical", "none"]: + if feature_type in ["numerical", "none"]: return FloatType else: raise NotImplementedError(f"Unknown feature type: {feature_type}") diff --git a/graphstorm-processing/tests/test_converter.py b/graphstorm-processing/tests/test_converter.py index ecd23e7961..e2935160c9 100644 --- a/graphstorm-processing/tests/test_converter.py +++ b/graphstorm-processing/tests/test_converter.py @@ -52,12 +52,11 @@ def test_try_read_file_with_wildcard( def test_try_read_unsupported_feature(converter: GConstructConfigConverter, node_dict: dict): - """We currently only support no-op features, so should error out otherwise.""" + """We currently only support no-op and numerical features, so should error out otherwise.""" node_dict["nodes"][0]["features"] = [ { - "feature_col": ["citation_time"], - "feature_name": "feat", - "transform": {"name": "max_min_norm"}, + "feature_col": ["paper_title"], + "transform": {"name": "tokenize_hf"}, } ] @@ -101,7 +100,7 @@ def test_read_node_gconstruct(converter: GConstructConfigConverter, node_dict: d assert node_config.separator is None assert node_config.column == "node_id" assert node_config.features == [ - {"column": "citation_time", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "citation_time", "transformation": {"name": "no-op"}, "name": "feat"} ] assert node_config.labels == [ { @@ -174,7 +173,7 @@ def test_read_edge_gconstruct(converter: GConstructConfigConverter): assert edge_config.separator is None assert edge_config.relation == "writing" assert edge_config.features == [ - {"column": "author", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "author", "transformation": {"name": "no-op"}, "name": "feat"} ] assert edge_config.labels == [ { @@ -206,7 +205,10 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): "files": ["/tmp/acm_raw/nodes/paper.parquet"], "separator": ",", "node_id_col": "node_id", - "features": [{"feature_col": ["citation_time"], "feature_name": "feat"}], + "features": [ + {"feature_col": ["citation_time"], "feature_name": "feat"}, + {"feature_col": ["num_citations"], "transform": {"name": "max_min_norm"}}, + ], "labels": [ {"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]} ], @@ -242,7 +244,14 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): assert nodes_output["type"] == "paper" assert nodes_output["column"] == "node_id" assert nodes_output["features"] == [ - {"column": "citation_time", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "citation_time", "transformation": {"name": "no-op"}, "name": "feat"}, + { + "column": "num_citations", + "transformation": { + "name": "numerical", + "kwargs": {"normalizer": "min-max", "imputer": "mean"}, + }, + }, ] assert nodes_output["labels"] == [ { @@ -260,7 +269,7 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter): assert edges_output["dest"] == {"column": "~to", "type": "paper"} assert edges_output["relation"] == {"type": "writing"} assert edges_output["features"] == [ - {"column": "author", "transform": {"name": "no-op"}, "name": "feat"} + {"column": "author", "transformation": {"name": "no-op"}, "name": "feat"} ] assert edges_output["labels"] == [ { diff --git a/graphstorm-processing/tests/test_dist_heterogenous_loader.py b/graphstorm-processing/tests/test_dist_heterogenous_loader.py index 13e5038f9f..975a03e7ea 100644 --- a/graphstorm-processing/tests/test_dist_heterogenous_loader.py +++ b/graphstorm-processing/tests/test_dist_heterogenous_loader.py @@ -58,12 +58,13 @@ def tempdir_fixture(): @pytest.fixture(scope="function", name="data_configs_with_label") def data_configs_with_label_fixture(): - """Create data configuration object that contain labels""" - config_path = os.path.join(_ROOT, "resources/small_heterogeneous_graph/gconstruct-config.json") + """Create data configuration object that contain features and labels""" + config_path = os.path.join( + _ROOT, "resources/small_heterogeneous_graph/gsprocessing-config.json" + ) with open(config_path, "r", encoding="utf-8") as conf_file: - gconstruct_config = json.load(conf_file) - gsprocessing_config = GConstructConfigConverter().convert_to_gsprocessing(gconstruct_config) + gsprocessing_config = json.load(conf_file) data_configs_dict = create_config_objects(gsprocessing_config["graph"]) @@ -230,7 +231,7 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade metadata = json.load(mfile) graphinfo_updates = { - "nfeat_size": {"user": {"age": 1}}, + "nfeat_size": {"user": {"age": 1, "multi": 2}}, "etype_label": [], "etype_label_property": [], "ntype_label": ["user"], @@ -245,7 +246,7 @@ def test_load_dist_heterogen_node_class(dghl_loader: DistHeterogeneousGraphLoade verify_integ_test_output(metadata, dghl_loader, graphinfo_updates) expected_node_data = { - "user": {"gender", "train_mask", "val_mask", "test_mask", "age"}, + "user": {"gender", "train_mask", "val_mask", "test_mask", "age", "multi"}, } for node_type in metadata["node_data"]: diff --git a/graphstorm-processing/tests/test_dist_label_loader.py b/graphstorm-processing/tests/test_dist_label_loader.py index 2df3e2cefb..3ba747d538 100644 --- a/graphstorm-processing/tests/test_dist_label_loader.py +++ b/graphstorm-processing/tests/test_dist_label_loader.py @@ -46,10 +46,8 @@ def test_dist_classification_label(spark: SparkSession, check_df_schema): label_transformer = DistLabelLoader(LabelConfig(classification_config), spark) transformed_labels = label_transformer.process_label(names_df) - transformed_labels.show() label_map = label_transformer.label_map - print(label_map) assert set(label_map.keys()) == {"mark", "john", "tara", "jen"}