Skip to content

Commit

Permalink
[GSProcessing] Don't produce masks when no labels present, other smal…
Browse files Browse the repository at this point in the history
…l fixes (#635)

*Issue #, if available:*

*Description of changes:*

* Change default behavior when no labels are given, from creating LP
masks for all edge types, to creating no masks.
* Fixes to docs formatting.
* Fix broken re-partitioning launch argument.
* Add scipy dependency to SageMaker Dockerfile
* Allow custom tags for image names



By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
thvasilo authored Nov 11, 2023
1 parent c85b204 commit a708401
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 97 deletions.
2 changes: 1 addition & 1 deletion docs/source/gs-processing/developer/developer-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ ensure it runs before every commit.
.. note::

The pre-commit hook will also apply to all commits you make to the root
GraphStorm repository. Since that Graphstorm doesn't use ``black``, you might
GraphStorm repository. Since Graphstorm doesn't use ``black``, you might
want to remove the ``black`` hook. You can do so from the root repo
using ``rm -rf .git/hooks``.

Expand Down
28 changes: 19 additions & 9 deletions docs/source/gs-processing/developer/input-configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ The GSProcessing input data configuration has two top-level objects:
We describe the ``graph`` object next.

Contents of the ``graph`` configuration object
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The ``graph`` configuration object can have two top-level objects:

Expand Down Expand Up @@ -386,11 +386,11 @@ arguments.

- ``none``: (Default) Don't normalize the numerical values during encoding.
- ``min-max``: Normalize each value by subtracting the minimum value from it,
and then dividing it by the difference between the maximum value and the minimum.
and then dividing it by the difference between the maximum value and the minimum.
- ``standard``: Normalize each value by dividing it by the sum of all the values.
- ``rank-gauss``: Normalize each value using Rank-Gauss normalization. Rank-gauss first ranks all values,
converts the ranks to the -1/1 range, and applies the `inverse of the error function <https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.erfinv.html>`_ to make the values conform
to a Gaussian distribution shape. This transformation only supports a single column as input.
converts the ranks to the -1/1 range, and applies the `inverse of the error function <https://docs.scipy.org/doc/scipy/reference/generated/scipy.special.erfinv.html>`_ to make the values conform
to a Gaussian distribution shape. This transformation only supports a single column as input.
- ``epsilon``: Only relevant for ``rank-gauss``, this epsilon value is added to the denominator
to avoid infinite values during normalization.
- ``multi-numerical``
Expand Down Expand Up @@ -430,8 +430,8 @@ arguments.
- ``categorical``

- Transforms values from a fixed list of possible values (categorical features) to a one-hot encoding.
The length of the resulting vector will be the number of categories in the data minus one, with a 1 in
the index of the single category, and zero everywhere else.
The length of the resulting vector will be the number of categories in the data minus one, with a 1 in
the index of the single category, and zero everywhere else.

.. note::
The maximum number of categories in any categorical feature is 100. If a property has more than 100 categories of value,
Expand All @@ -441,12 +441,22 @@ arguments.

- Encodes vector-like data from a fixed list of possible values (i.e. multi-label/multi-categorical data) using a multi-hot encoding. The length of the resulting vector will be the number of categories in the data minus one, and each value will have a 1 value for every category that appears, and 0 everwhere else.
- ``kwargs``:

- ``separator`` (String, optional): Same as the one in the No-op operation, the separator is used to
split multiple input values for CSV files e.g. ``detective|noir``. If it is not provided, then the whole value
will be considered as an array. For Parquet files, if the input type is ArrayType(StringType()), then the
separator is ignored; if it is StringType(), it will apply same logic as in CSV.
split multiple input values for CSV files e.g. ``detective|noir``. If it is not provided, then the whole value
will be considered as an array. For Parquet files, if the input type is ArrayType(StringType()), then the
separator is ignored; if it is StringType(), it will apply same logic as in CSV.

--------------

Creating a graph for inference
------------------------------

If no label entries are provided for any of the entries
in the input configuration, the processed data will not
include any train/val/test masks. You can use this mode
when you want to produce a graph just for inference.

Examples
~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,4 @@ To get started with developing the package refer to :doc:`developer/developer-gu
.. [#f1] DGL expects that every file produced for a single node/edge type
has matching row counts, which is something that Spark cannot guarantee.
We use the re-partitioning script to fix this where needed in the produced
output.
output. See :doc:`usage/row-count-alignment` for details.
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,11 @@ To build an EMR Serverless GSProcessing image for the ``arm64`` architecture you
.. note::

Building images under emulation using QEMU can be significantly slower than native builds
Building images for the first time under emulation using QEMU
can be significantly slower than native builds
(more than 20 minutes to build the GSProcessing ``arm64`` image).
After the first build, follow up builds that only change the GSProcessing code
will be less than a minute thanks to Docker's caching.
To speed up the build process you can build on an ARM instances,
look into using ``buildx`` with multiple native nodes, or use cross-compilation.
See `the official Docker documentation <https://docs.docker.com/build/building/multi-platform/>`_
Expand Down
19 changes: 15 additions & 4 deletions graphstorm-processing/docker/0.2.1/sagemaker/Dockerfile.cpu
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,21 @@ ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib"
ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/conda/lib"
ENV PATH=/opt/conda/bin:$PATH

# GSProcessing requirements
RUN pipenv install pip==23.1.2 setuptools wheel spacy==3.6.0 pyspark==3.4.1 \
pyarrow==13.0.0 joblib==1.3.1 psutil==5.9.5 pandas==1.3.5 \
boto3==1.28.38 protobuf==3.20.3 mock==5.1.0 \
# Install GSProcessing requirements to pipenv Python
RUN pipenv install \
boto3==1.28.38 \
joblib==1.3.1 \
mock==5.1.0 \
pandas==1.3.5 \
pip==23.1.2 \
protobuf==3.20.3 \
psutil==5.9.5 \
pyarrow==13.0.0 \
pyspark==3.4.1 \
scipy==1.11.3 \
setuptools \
spacy==3.6.0 \
wheel \
&& rm -rf /root/.cache
# Do a pipenv sync so our base libs are independent from our editable code, making them cacheable
RUN pipenv sync --system && python3 -m spacy download en_core_web_lg \
Expand Down
9 changes: 8 additions & 1 deletion graphstorm-processing/docker/build_gsprocessing_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Available options:
-p, --path Path to graphstorm-processing directory, default is the current directory.
-i, --image Docker image name, default is 'graphstorm-processing'.
-v, --version Docker version tag, default is the library's current version (`poetry version --short`)
-s, --suffix Suffix for the image tag, can be used to push custom image tags. Default is "".
-b, --build Docker build directory, default is '/tmp/`
EOF
exit
Expand All @@ -46,6 +47,7 @@ parse_params() {
BUILD_DIR='/tmp'
TARGET='test'
ARCH='x86_64'
SUFFIX=""
while :; do
case "${1-}" in
Expand Down Expand Up @@ -80,6 +82,10 @@ parse_params() {
VERSION="${2-}"
shift
;;
-s | --suffix)
SUFFIX="${2-}"
shift
;;
-?*) die "Unknown option: $1" ;;
*) break ;;
esac
Expand Down Expand Up @@ -128,6 +134,7 @@ msg "- TARGET: ${TARGET}"
msg "- GSP_HOME: ${GSP_HOME}"
msg "- IMAGE_NAME: ${IMAGE_NAME}"
msg "- VERSION: ${VERSION}"
msg "- SUFFIX: ${SUFFIX}"
# Prepare Docker build directory
rm -rf "${BUILD_DIR}/docker/code"
Expand All @@ -151,7 +158,7 @@ cp ${GSP_HOME}/docker-entry.sh "${BUILD_DIR}/docker/code/"
poetry export -f requirements.txt --output "${BUILD_DIR}/docker/requirements.txt"
# Set image name
DOCKER_FULLNAME="${IMAGE_NAME}-${EXEC_ENV}:${VERSION}-${ARCH}"
DOCKER_FULLNAME="${IMAGE_NAME}-${EXEC_ENV}:${VERSION}-${ARCH}${SUFFIX}"
# Login to ECR to be able to pull source SageMaker image
if [[ ${EXEC_ENV} == "sagemaker" ]]; then
Expand Down
22 changes: 14 additions & 8 deletions graphstorm-processing/docker/push_gsprocessing_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Available options:
-c, --architecture Image architecture. Must be one of 'x86_64' or 'arm64'. Default is 'x86_64'.
-i, --image Docker image name, default is 'graphstorm-processing'.
-v, --version Docker version tag, default is the library's current version (`poetry version --short`)
-s, --suffix Suffix for the image tag, can be used to push custom image tags. Default is "".
-r, --region AWS Region to which we'll push the image. By default will get from aws-cli configuration.
-a, --account AWS Account ID. By default will get from aws-cli configuration.
EOF
Expand All @@ -45,6 +46,7 @@ parse_params() {
REGION=${REGION:-us-west-2}
ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
ARCH='x86_64'
SUFFIX=""


while :; do
Expand All @@ -68,6 +70,10 @@ parse_params() {
VERSION="${2-}"
shift
;;
-s | --suffix)
SUFFIX="${2-}"
shift
;;
-r | --region)
REGION="${2-}"
shift
Expand Down Expand Up @@ -110,13 +116,13 @@ msg "- VERSION: ${VERSION}"
msg "- REGION: ${REGION}"
msg "- ACCOUNT: ${ACCOUNT}"

SUFFIX="${VERSION}-${ARCH}"
LATEST_SUFFIX="latest-${ARCH}"
TAG="${VERSION}-${ARCH}${SUFFIX}"
LATEST_TAG="latest-${ARCH}"
IMAGE_WITH_ENV="${IMAGE}-${EXEC_ENV}"


FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${SUFFIX}"
LATEST_TAG="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${LATEST_SUFFIX}"
FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${TAG}"
LATEST_FULLNAME="${ACCOUNT}.dkr.ecr.${REGION}.amazonaws.com/${IMAGE_WITH_ENV}:${LATEST_TAG}"

# If the repository doesn't exist in ECR, create it.
echo "Getting or creating container repository: ${IMAGE_WITH_ENV}"
Expand All @@ -131,11 +137,11 @@ aws ecr get-login-password --region ${REGION} | \

echo "Pushing image to ${FULLNAME}"

docker tag ${IMAGE_WITH_ENV}:${SUFFIX} ${FULLNAME}
docker tag ${IMAGE_WITH_ENV}:${TAG} ${FULLNAME}

docker push ${FULLNAME}

if [ ${VERSION} = ${LATEST_VERSION} ]; then
docker tag ${IMAGE_WITH_ENV}:${SUFFIX} ${LATEST_TAG}
docker push ${LATEST_TAG}
fi
docker tag ${IMAGE_WITH_ENV}:${TAG} ${LATEST_FULLNAME}
docker push ${LATEST_FULLNAME}
fi
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
SPECIAL_CHARACTERS,
)
from ..config.config_parser import EdgeConfig, NodeConfig, StructureConfig
from ..config.label_config_base import LabelConfig, EdgeLabelConfig
from ..config.label_config_base import LabelConfig
from ..config.feature_config_base import FeatureConfig
from ..data_transformations.dist_feature_transformer import DistFeatureTransformer
from ..data_transformations.dist_label_loader import DistLabelLoader, SplitRates
Expand Down Expand Up @@ -136,6 +136,7 @@ def __init__(
self.column_substitutions = {} # type: Dict[str, str]
self.graph_info = {} # type: Dict[str, Any]
self.graph_name = graph_name
self.skip_train_masks = False

def process_and_write_graph_data(
self, data_configs: Mapping[str, Sequence[StructureConfig]]
Expand All @@ -160,7 +161,11 @@ def process_and_write_graph_data(
process_start_time = perf_counter()

if not self._at_least_one_label_exists(data_configs):
self._insert_link_prediction_labels(data_configs["edges"])
logging.warning(
"No labels exist in the dataset, will not produce any masks, "
"and set task to 'link_prediction'."
)
self.skip_train_masks = True

metadata_dict = self._initialize_metadata_dict(data_configs)

Expand Down Expand Up @@ -260,25 +265,6 @@ def _at_least_one_label_exists(data_configs: Mapping[str, Sequence[StructureConf

return False

@staticmethod
def _insert_link_prediction_labels(edge_configs: Sequence[StructureConfig]) -> None:
"""
Inserts a link prediction label entry into the `edges` top-level keys.
Modifies the data_configs object in-place.
Parameters
----------
edge_configs
A sequence of edge structure configurations
"""
for edge_config in edge_configs:
config_dict = {
"column": "",
"type": "link_prediction",
"split_rate": {"train": 0.9, "val": 0.1, "test": 0.0},
}
edge_config.set_labels([EdgeLabelConfig(config_dict)])

def _initialize_metadata_dict(
self, data_configs: Mapping[str, Sequence[StructureConfig]]
) -> Dict:
Expand Down Expand Up @@ -325,6 +311,8 @@ def _initialize_metadata_dict(
return metadata_dict

def _finalize_graphinfo_dict(self, metadata_dict: Dict) -> Dict:
if self.skip_train_masks:
self.graph_info["task_type"] = "link_prediction"
self.graph_info["graph_type"] = "heterogeneous"

self.graph_info["num_nodes"] = sum(metadata_dict["num_nodes_per_type"])
Expand Down Expand Up @@ -1142,7 +1130,9 @@ def write_edge_structure(
# TODO: We need to repartition to ensure same file count for
# all downstream DataFrames, but it causes a full shuffle.
# Can it be avoided?
edge_df_with_int_ids = edge_df_with_int_ids.repartition(self.num_output_files)
edge_df_with_int_ids = edge_df_with_int_ids.drop(src_col, dst_col).repartition(
self.num_output_files
)
edge_df_with_int_ids_and_all_features = edge_df_with_int_ids
edge_df_with_only_int_ids = edge_df_with_int_ids.select(["src_int_id", "dst_int_id"])

Expand Down Expand Up @@ -1447,7 +1437,7 @@ def _process_edge_labels(

self._update_label_properties(edge_type, edges_df, label_conf)
else:
self.graph_info["task_type"] = "link_predict"
self.graph_info["task_type"] = "link_prediction"
logging.info(
"Skipping processing label for '%s' because task is link prediction",
rel_type_prefix,
Expand Down Expand Up @@ -1619,15 +1609,15 @@ def write_mask(kind: str, mask_df: DataFrame) -> Sequence[str]:
)
return out_path_list

train_mask_df = int_group_df.withColumn("train_mask", F.col(group_col_name)[0])
train_mask_df = int_group_df.select(F.col(group_col_name)[0].alias("train_mask"))
out_path_list = write_mask("train", train_mask_df)
split_metadata["train_mask"] = create_metadata_entry(out_path_list)

val_mask_df = int_group_df.withColumn("val_mask", F.col(group_col_name)[1])
val_mask_df = int_group_df.select(F.col(group_col_name)[1].alias("val_mask"))
out_path_list = write_mask("val", val_mask_df)
split_metadata["val_mask"] = create_metadata_entry(out_path_list)

test_mask_df = int_group_df.withColumn("test_mask", F.col(group_col_name)[2])
test_mask_df = int_group_df.select(F.col(group_col_name)[2].alias("test_mask"))
out_path_list = write_mask("test", test_mask_df)
split_metadata["test_mask"] = create_metadata_entry(out_path_list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,6 @@ def parse_args(args):
default=False,
help="When True will use low-memory file-streaming repartitioning. "
"Note that this option is much slower than the in-memory default.",
choices=["True", "False", "1", "0"],
)
parser.add_argument(
"--metadata-file-name",
Expand Down Expand Up @@ -839,7 +838,7 @@ def main():

edge_structure_meta = metadata_dict["edges"] # type: Dict[str, Dict[str, Dict]]

task_type = metadata_dict["graph_info"]["task_type"] # type: str
task_type = metadata_dict["graph_info"].get("task_type", "link_prediction") # type: str

edge_data_exist = "edge_data" in metadata_dict.keys() and metadata_dict["edge_data"]
node_data_exist = "node_data" in metadata_dict.keys() and metadata_dict["node_data"]
Expand Down
Loading

0 comments on commit a708401

Please sign in to comment.