diff --git a/docs/source/gs-processing/developer/developer-guide.rst b/docs/source/gs-processing/developer/developer-guide.rst index 385da9ec7d..67efe2f33b 100644 --- a/docs/source/gs-processing/developer/developer-guide.rst +++ b/docs/source/gs-processing/developer/developer-guide.rst @@ -222,7 +222,7 @@ ensure it runs before every commit. .. note:: The pre-commit hook will also apply to all commits you make to the root - GraphStorm repository. Since that Graphstorm doesn't use ``black``, you might + GraphStorm repository. Since Graphstorm doesn't use ``black``, you might want to remove the ``black`` hook. You can do so from the root repo using ``rm -rf .git/hooks``. diff --git a/docs/source/gs-processing/developer/input-configuration.rst b/docs/source/gs-processing/developer/input-configuration.rst index 8ce897bb56..70e2da28ae 100644 --- a/docs/source/gs-processing/developer/input-configuration.rst +++ b/docs/source/gs-processing/developer/input-configuration.rst @@ -35,7 +35,7 @@ The GSProcessing input data configuration has two top-level objects: We describe the ``graph`` object next. Contents of the ``graph`` configuration object -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ``graph`` configuration object can have two top-level objects: @@ -386,11 +386,11 @@ arguments. - ``none``: (Default) Don't normalize the numerical values during encoding. - ``min-max``: Normalize each value by subtracting the minimum value from it, - and then dividing it by the difference between the maximum value and the minimum. + and then dividing it by the difference between the maximum value and the minimum. - ``standard``: Normalize each value by dividing it by the sum of all the values. - ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values, - converts the ranks to the -1/1 range, and applies the `inverse of the error function `_ to make the values conform - to a Gaussian distribution shape. This transformation only supports a single column as input. + converts the ranks to the -1/1 range, and applies the `inverse of the error function `_ to make the values conform + to a Gaussian distribution shape. This transformation only supports a single column as input. - ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator to avoid infinite values during normalization. - ``multi-numerical`` @@ -430,8 +430,8 @@ arguments. - ``categorical`` - Transforms values from a fixed list of possible values (categorical features) to a one-hot encoding. - The length of the resulting vector will be the number of categories in the data minus one, with a 1 in - the index of the single category, and zero everywhere else. + The length of the resulting vector will be the number of categories in the data minus one, with a 1 in + the index of the single category, and zero everywhere else. .. note:: The maximum number of categories in any categorical feature is 100. If a property has more than 100 categories of value, @@ -441,12 +441,22 @@ arguments. - Encodes vector-like data from a fixed list of possible values (i.e. multi-label/multi-categorical data) using a multi-hot encoding. The length of the resulting vector will be the number of categories in the data minus one, and each value will have a 1 value for every category that appears, and 0 everwhere else. - ``kwargs``: + - ``separator`` (String, optional): Same as the one in the No-op operation, the separator is used to - split multiple input values for CSV files e.g. ``detective|noir``. If it is not provided, then the whole value - will be considered as an array. For Parquet files, if the input type is ArrayType(StringType()), then the - separator is ignored; if it is StringType(), it will apply same logic as in CSV. + split multiple input values for CSV files e.g. ``detective|noir``. If it is not provided, then the whole value + will be considered as an array. For Parquet files, if the input type is ArrayType(StringType()), then the + separator is ignored; if it is StringType(), it will apply same logic as in CSV. + -------------- +Creating a graph for inference +------------------------------ + +If no label entries are provided for any of the entries +in the input configuration, the processed data will not +include any train/val/test masks. You can use this mode +when you want to produce a graph just for inference. + Examples ~~~~~~~~ diff --git a/docs/source/gs-processing/gs-processing-getting-started.rst b/docs/source/gs-processing/gs-processing-getting-started.rst index 97f63086b2..4e2427e5b4 100644 --- a/docs/source/gs-processing/gs-processing-getting-started.rst +++ b/docs/source/gs-processing/gs-processing-getting-started.rst @@ -180,4 +180,4 @@ To get started with developing the package refer to :doc:`developer/developer-gu .. [#f1] DGL expects that every file produced for a single node/edge type has matching row counts, which is something that Spark cannot guarantee. We use the re-partitioning script to fix this where needed in the produced - output. \ No newline at end of file + output. See :doc:`usage/row-count-alignment` for details. \ No newline at end of file diff --git a/docs/source/gs-processing/usage/distributed-processing-setup.rst b/docs/source/gs-processing/usage/distributed-processing-setup.rst index 261c0ce9a9..12ecd88d56 100644 --- a/docs/source/gs-processing/usage/distributed-processing-setup.rst +++ b/docs/source/gs-processing/usage/distributed-processing-setup.rst @@ -161,8 +161,11 @@ To build an EMR Serverless GSProcessing image for the ``arm64`` architecture you .. note:: - Building images under emulation using QEMU can be significantly slower than native builds + Building images for the first time under emulation using QEMU + can be significantly slower than native builds (more than 20 minutes to build the GSProcessing ``arm64`` image). + After the first build, follow up builds that only change the GSProcessing code + will be less than a minute thanks to Docker's caching. To speed up the build process you can build on an ARM instances, look into using ``buildx`` with multiple native nodes, or use cross-compilation. See `the official Docker documentation `_ diff --git a/graphstorm-processing/docker/0.2.1/sagemaker/Dockerfile.cpu b/graphstorm-processing/docker/0.2.1/sagemaker/Dockerfile.cpu index c39e21bbf6..13365347fd 100644 --- a/graphstorm-processing/docker/0.2.1/sagemaker/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.2.1/sagemaker/Dockerfile.cpu @@ -12,10 +12,21 @@ ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib" ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/conda/lib" ENV PATH=/opt/conda/bin:$PATH -# GSProcessing requirements -RUN pipenv install pip==23.1.2 setuptools wheel spacy==3.6.0 pyspark==3.4.1 \ - pyarrow==13.0.0 joblib==1.3.1 psutil==5.9.5 pandas==1.3.5 \ - boto3==1.28.38 protobuf==3.20.3 mock==5.1.0 \ +# Install GSProcessing requirements to pipenv Python +RUN pipenv install \ + boto3==1.28.38 \ + joblib==1.3.1 \ + mock==5.1.0 \ + pandas==1.3.5 \ + pip==23.1.2 \ + protobuf==3.20.3 \ + psutil==5.9.5 \ + pyarrow==13.0.0 \ + pyspark==3.4.1 \ + scipy==1.11.3 \ + setuptools \ + spacy==3.6.0 \ + wheel \ && rm -rf /root/.cache # Do a pipenv sync so our base libs are independent from our editable code, making them cacheable RUN pipenv sync --system && python3 -m spacy download en_core_web_lg \ diff --git a/graphstorm-processing/docker/build_gsprocessing_image.sh b/graphstorm-processing/docker/build_gsprocessing_image.sh index 4c53f74416..4f5de008b5 100644 --- a/graphstorm-processing/docker/build_gsprocessing_image.sh +++ b/graphstorm-processing/docker/build_gsprocessing_image.sh @@ -22,6 +22,7 @@ Available options: -p, --path Path to graphstorm-processing directory, default is the current directory. -i, --image Docker image name, default is 'graphstorm-processing'. -v, --version Docker version tag, default is the library's current version (`poetry version --short`) +-s, --suffix Suffix for the image tag, can be used to push custom image tags. Default is "". -b, --build Docker build directory, default is '/tmp/` EOF exit @@ -46,6 +47,7 @@ parse_params() { BUILD_DIR='/tmp' TARGET='test' ARCH='x86_64' + SUFFIX="" while :; do case "${1-}" in @@ -80,6 +82,10 @@ parse_params() { VERSION="${2-}" shift ;; + -s | --suffix) + SUFFIX="${2-}" + shift + ;; -?*) die "Unknown option: $1" ;; *) break ;; esac @@ -128,6 +134,7 @@ msg "- TARGET: ${TARGET}" msg "- GSP_HOME: ${GSP_HOME}" msg "- IMAGE_NAME: ${IMAGE_NAME}" msg "- VERSION: ${VERSION}" +msg "- SUFFIX: ${SUFFIX}" # Prepare Docker build directory rm -rf "${BUILD_DIR}/docker/code" @@ -151,7 +158,7 @@ cp ${GSP_HOME}/docker-entry.sh "${BUILD_DIR}/docker/code/" poetry export -f requirements.txt --output "${BUILD_DIR}/docker/requirements.txt" # Set image name -DOCKER_FULLNAME="${IMAGE_NAME}-${EXEC_ENV}:${VERSION}-${ARCH}" +DOCKER_FULLNAME="${IMAGE_NAME}-${EXEC_ENV}:${VERSION}-${ARCH}${SUFFIX}" # Login to ECR to be able to pull source SageMaker image if [[ ${EXEC_ENV} == "sagemaker" ]]; then diff --git a/graphstorm-processing/docker/push_gsprocessing_image.sh b/graphstorm-processing/docker/push_gsprocessing_image.sh index eaab38876a..ae2a1d1973 100644 --- a/graphstorm-processing/docker/push_gsprocessing_image.sh +++ b/graphstorm-processing/docker/push_gsprocessing_image.sh @@ -19,6 +19,7 @@ Available options: -c, --architecture Image architecture. Must be one of 'x86_64' or 'arm64'. Default is 'x86_64'. -i, --image Docker image name, default is 'graphstorm-processing'. -v, --version Docker version tag, default is the library's current version (`poetry version --short`) +-s, --suffix Suffix for the image tag, can be used to push custom image tags. Default is "". -r, --region AWS Region to which we'll push the image. By default will get from aws-cli configuration. -a, --account AWS Account ID. By default will get from aws-cli configuration. EOF @@ -45,6 +46,7 @@ parse_params() { REGION=${REGION:-us-west-2} ACCOUNT=$(aws sts get-caller-identity --query Account --output text) ARCH='x86_64' + SUFFIX="" while :; do @@ -68,6 +70,10 @@ parse_params() { VERSION="${2-}" shift ;; + -s | --suffix) + SUFFIX="${2-}" + shift + ;; -r | --region) REGION="${2-}" shift @@ -110,13 +116,13 @@ msg "- VERSION: ${VERSION}" msg "- REGION: ${REGION}" msg "- ACCOUNT: ${ACCOUNT}" -SUFFIX="${VERSION}-${ARCH}" -LATEST_SUFFIX="latest-${ARCH}" +TAG="${VERSION}-${ARCH}${SUFFIX}" +LATEST_TAG="latest-${ARCH}" IMAGE_WITH_ENV="${IMAGE}-${EXEC_ENV}" -FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${SUFFIX}" -LATEST_TAG="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${LATEST_SUFFIX}" +FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${TAG}" +LATEST_FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${LATEST_TAG}" # If the repository doesn't exist in ECR, create it. echo "Getting or creating container repository: ${IMAGE_WITH_ENV}" @@ -131,11 +137,11 @@ aws ecr get-login-password --region ${REGION} | \ echo "Pushing image to ${FULLNAME}" -docker tag ${IMAGE_WITH_ENV}:${SUFFIX} ${FULLNAME} +docker tag ${IMAGE_WITH_ENV}:${TAG} ${FULLNAME} docker push ${FULLNAME} if [ ${VERSION} = ${LATEST_VERSION} ]; then - docker tag ${IMAGE_WITH_ENV}:${SUFFIX} ${LATEST_TAG} - docker push ${LATEST_TAG} -fi \ No newline at end of file + docker tag ${IMAGE_WITH_ENV}:${TAG} ${LATEST_FULLNAME} + docker push ${LATEST_FULLNAME} +fi diff --git a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py index f02d04ede4..bd00c74c58 100644 --- a/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py +++ b/graphstorm-processing/graphstorm_processing/graph_loaders/dist_heterogeneous_loader.py @@ -42,7 +42,7 @@ SPECIAL_CHARACTERS, ) from ..config.config_parser import EdgeConfig, NodeConfig, StructureConfig -from ..config.label_config_base import LabelConfig, EdgeLabelConfig +from ..config.label_config_base import LabelConfig from ..config.feature_config_base import FeatureConfig from ..data_transformations.dist_feature_transformer import DistFeatureTransformer from ..data_transformations.dist_label_loader import DistLabelLoader, SplitRates @@ -136,6 +136,7 @@ def __init__( self.column_substitutions = {} # type: Dict[str, str] self.graph_info = {} # type: Dict[str, Any] self.graph_name = graph_name + self.skip_train_masks = False def process_and_write_graph_data( self, data_configs: Mapping[str, Sequence[StructureConfig]] @@ -160,7 +161,11 @@ def process_and_write_graph_data( process_start_time = perf_counter() if not self._at_least_one_label_exists(data_configs): - self._insert_link_prediction_labels(data_configs["edges"]) + logging.warning( + "No labels exist in the dataset, will not produce any masks, " + "and set task to 'link_prediction'." + ) + self.skip_train_masks = True metadata_dict = self._initialize_metadata_dict(data_configs) @@ -260,25 +265,6 @@ def _at_least_one_label_exists(data_configs: Mapping[str, Sequence[StructureConf return False - @staticmethod - def _insert_link_prediction_labels(edge_configs: Sequence[StructureConfig]) -> None: - """ - Inserts a link prediction label entry into the `edges` top-level keys. - Modifies the data_configs object in-place. - - Parameters - ---------- - edge_configs - A sequence of edge structure configurations - """ - for edge_config in edge_configs: - config_dict = { - "column": "", - "type": "link_prediction", - "split_rate": {"train": 0.9, "val": 0.1, "test": 0.0}, - } - edge_config.set_labels([EdgeLabelConfig(config_dict)]) - def _initialize_metadata_dict( self, data_configs: Mapping[str, Sequence[StructureConfig]] ) -> Dict: @@ -325,6 +311,8 @@ def _initialize_metadata_dict( return metadata_dict def _finalize_graphinfo_dict(self, metadata_dict: Dict) -> Dict: + if self.skip_train_masks: + self.graph_info["task_type"] = "link_prediction" self.graph_info["graph_type"] = "heterogeneous" self.graph_info["num_nodes"] = sum(metadata_dict["num_nodes_per_type"]) @@ -1142,7 +1130,9 @@ def write_edge_structure( # TODO: We need to repartition to ensure same file count for # all downstream DataFrames, but it causes a full shuffle. # Can it be avoided? - edge_df_with_int_ids = edge_df_with_int_ids.repartition(self.num_output_files) + edge_df_with_int_ids = edge_df_with_int_ids.drop(src_col, dst_col).repartition( + self.num_output_files + ) edge_df_with_int_ids_and_all_features = edge_df_with_int_ids edge_df_with_only_int_ids = edge_df_with_int_ids.select(["src_int_id", "dst_int_id"]) @@ -1447,7 +1437,7 @@ def _process_edge_labels( self._update_label_properties(edge_type, edges_df, label_conf) else: - self.graph_info["task_type"] = "link_predict" + self.graph_info["task_type"] = "link_prediction" logging.info( "Skipping processing label for '%s' because task is link prediction", rel_type_prefix, @@ -1619,15 +1609,15 @@ def write_mask(kind: str, mask_df: DataFrame) -> Sequence[str]: ) return out_path_list - train_mask_df = int_group_df.withColumn("train_mask", F.col(group_col_name)[0]) + train_mask_df = int_group_df.select(F.col(group_col_name)[0].alias("train_mask")) out_path_list = write_mask("train", train_mask_df) split_metadata["train_mask"] = create_metadata_entry(out_path_list) - val_mask_df = int_group_df.withColumn("val_mask", F.col(group_col_name)[1]) + val_mask_df = int_group_df.select(F.col(group_col_name)[1].alias("val_mask")) out_path_list = write_mask("val", val_mask_df) split_metadata["val_mask"] = create_metadata_entry(out_path_list) - test_mask_df = int_group_df.withColumn("test_mask", F.col(group_col_name)[2]) + test_mask_df = int_group_df.select(F.col(group_col_name)[2].alias("test_mask")) out_path_list = write_mask("test", test_mask_df) split_metadata["test_mask"] = create_metadata_entry(out_path_list) diff --git a/graphstorm-processing/graphstorm_processing/repartition_files.py b/graphstorm-processing/graphstorm_processing/repartition_files.py index 30075de73b..7be6667c06 100644 --- a/graphstorm-processing/graphstorm_processing/repartition_files.py +++ b/graphstorm-processing/graphstorm_processing/repartition_files.py @@ -737,7 +737,6 @@ def parse_args(args): default=False, help="When True will use low-memory file-streaming repartitioning. " "Note that this option is much slower than the in-memory default.", - choices=["True", "False", "1", "0"], ) parser.add_argument( "--metadata-file-name", @@ -839,7 +838,7 @@ def main(): edge_structure_meta = metadata_dict["edges"] # type: Dict[str, Dict[str, Dict]] - task_type = metadata_dict["graph_info"]["task_type"] # type: str + task_type = metadata_dict["graph_info"].get("task_type", "link_prediction") # type: str edge_data_exist = "edge_data" in metadata_dict.keys() and metadata_dict["edge_data"] node_data_exist = "node_data" in metadata_dict.keys() and metadata_dict["node_data"] diff --git a/graphstorm-processing/tests/test_dist_heterogenous_loader.py b/graphstorm-processing/tests/test_dist_heterogenous_loader.py index 975a03e7ea..f67a52babe 100644 --- a/graphstorm-processing/tests/test_dist_heterogenous_loader.py +++ b/graphstorm-processing/tests/test_dist_heterogenous_loader.py @@ -13,7 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. """ -from typing import Any, Dict, List, Sequence, Tuple +from typing import Any, Dict, List, Tuple import json import os import shutil @@ -33,7 +33,6 @@ from graphstorm_processing.data_transformations.dist_label_loader import SplitRates from graphstorm_processing.config.label_config_base import NodeLabelConfig, EdgeLabelConfig from graphstorm_processing.config.config_parser import ( - StructureConfig, create_config_objects, EdgeConfig, ) @@ -264,15 +263,8 @@ def test_load_dist_hgl_without_labels(dghl_loader_no_label: DistHeterogeneousGra graphinfo_updates = { "nfeat_size": {}, - "task_type": "link_predict", - "etype_label": [ - "movie:included_in:genre", - "genre:included_in-rev:movie", - "user:rated:movie", - "movie:rated-rev:user", - "director:directed:movie", - "movie:directed-rev:director", - ], + "task_type": "link_prediction", + "etype_label": [], "etype_label_property": [], "ntype_label": [], "ntype_label_property": [], @@ -282,28 +274,9 @@ def test_load_dist_hgl_without_labels(dghl_loader_no_label: DistHeterogeneousGra verify_integ_test_output(metadata, dghl_loader_no_label, graphinfo_updates) - expected_edge_data = { - "user:rated:movie": {"train_mask", "val_mask", "test_mask"}, - "movie:rated-rev:user": {"train_mask", "val_mask", "test_mask"}, - "movie:included_in:genre": {"train_mask", "val_mask", "test_mask"}, - "genre:included_in-rev:movie": {"train_mask", "val_mask", "test_mask"}, - "director:directed:movie": {"train_mask", "val_mask", "test_mask"}, - "movie:directed-rev:director": {"train_mask", "val_mask", "test_mask"}, - } + expected_edge_data = {} - for edge_type in metadata["edge_data"]: - assert metadata["edge_data"][edge_type].keys() == expected_edge_data[edge_type] - if not "-rev" in edge_type: - src_type, relation, dst_type = edge_type.split(":") - rev_type = f"{dst_type}:{relation}-rev:{src_type}" - assert ( - metadata["edge_data"][rev_type]["train_mask"] - == metadata["edge_data"][edge_type]["train_mask"] - ) - assert ( - metadata["edge_data"][rev_type]["val_mask"] - == metadata["edge_data"][edge_type]["val_mask"] - ) + assert metadata["edge_data"] == expected_edge_data def test_write_edge_structure_no_reverse_edges( @@ -533,18 +506,6 @@ def test_at_least_one_label_exists(no_label_data_configs, data_configs_with_labe assert DistHeterogeneousGraphLoader._at_least_one_label_exists(data_configs_with_label) -def test_insert_link_prediction_labels(no_label_data_configs: Dict[str, Sequence[StructureConfig]]): - """Test inserting link prediction labels when no labels are provided""" - DistHeterogeneousGraphLoader._insert_link_prediction_labels(no_label_data_configs["edges"]) - - modified_edge_configs = no_label_data_configs["edges"] # type: Sequence[StructureConfig] - - for edge_config in modified_edge_configs: - assert edge_config.label_configs - assert edge_config.label_configs[0].task_type == "link_prediction" - assert edge_config.label_configs[0].label_column == "" - - def test_create_split_files_from_rates_empty_col( spark: SparkSession, dghl_loader: DistHeterogeneousGraphLoader, tempdir ):