diff --git a/docs/source/_templates/dataloadertemplate.rst b/docs/source/_templates/dataloadertemplate.rst index f02d586215..515139624d 100644 --- a/docs/source/_templates/dataloadertemplate.rst +++ b/docs/source/_templates/dataloadertemplate.rst @@ -7,4 +7,6 @@ .. autoclass:: {{ name }} :show-inheritance: - :special-members: __iter__, __next__ \ No newline at end of file + :members: + :member-order: bysource + :special-members: __iter__, __next__, __len__ diff --git a/docs/source/_templates/datasettemplate.rst b/docs/source/_templates/datasettemplate.rst index 1225aa3c46..16e04ade54 100644 --- a/docs/source/_templates/datasettemplate.rst +++ b/docs/source/_templates/datasettemplate.rst @@ -7,4 +7,5 @@ .. autoclass:: {{ name }} :show-inheritance: - :members: prepare_data, get_node_feats, get_edge_feats, get_labels, get_node_feat_size + :members: + :member-order: bysource diff --git a/docs/source/api/references/graphstorm.dataloading.rst b/docs/source/api/references/graphstorm.dataloading.rst index 2ce324888c..db95907702 100644 --- a/docs/source/api/references/graphstorm.dataloading.rst +++ b/docs/source/api/references/graphstorm.dataloading.rst @@ -1,17 +1,34 @@ .. _apidataloading: -graphstorm.dataloading -========================== +graphstorm.dataloading.dataset +=============================== - GraphStorm dataloading module includes a set of graph DataSets and DataLoaders for different - graph machine learning tasks. - - If users would like to customize DataLoaders, please extend those classes in the - :ref:`Base DataLoaders ` section and customize their abstract methods. + GraphStorm dataset provides one unified dataset class, i.e., ``GSgnnData``, for all graph + machine learning tasks. Users can build a ``GSgnnData`` object by giving the path of + the JSON file created by the :ref:`GraphStorm Graph Construction` + operations. The ``GSgnnData`` will load the related graph artifacts specified in the JSON + file. It provides a set of APIs for users to extract information of the graph data for + model training and inference. .. currentmodule:: graphstorm.dataloading -.. _basedataloaders: +.. autosummary:: + :toctree: ../generated/ + :nosignatures: + :template: datasettemplate.rst + + GSgnnData + +graphstorm.dataloading.dataloading +=================================== + + GraphStorm dataloading module includes a set of different DataLoaders for + different graph machine learning tasks. + + If users would like to customize DataLoaders, please extend those dataloader base + classes in the **Base DataLoaders** section and customize their abstract functions. + +.. currentmodule:: graphstorm.dataloading Base DataLoaders ------------------- @@ -25,16 +42,6 @@ Base DataLoaders GSgnnEdgeDataLoaderBase GSgnnLinkPredictionDataLoaderBase -DataSets ------------- - -.. autosummary:: - :toctree: ../generated/ - :nosignatures: - :template: datasettemplate.rst - - GSgnnData - DataLoaders ------------ @@ -44,5 +51,8 @@ DataLoaders :template: dataloadertemplate.rst GSgnnNodeDataLoader + GSgnnNodeSemiSupDataLoader GSgnnEdgeDataLoader GSgnnLinkPredictionDataLoader + GSgnnLinkPredictionTestDataLoader + GSgnnLinkPredictionPredefinedTestDataLoader diff --git a/docs/source/graph-construction/gs-processing/example.rst b/docs/source/graph-construction/gs-processing/example.rst index 23afa10ddd..14d2d0e984 100644 --- a/docs/source/graph-construction/gs-processing/example.rst +++ b/docs/source/graph-construction/gs-processing/example.rst @@ -1,41 +1,54 @@ .. _distributed_construction_example: -GraphStorm Processing Example -============================= +A GraphStorm Distributed Graph Construction Example +=================================================== -To demonstrate how to use the library locally we will +GraphStorm's distributed graph construction is involved with multiple steps. +To help users better understand these steps, we provide an example of distributed graph construction, +which can run locally in one instance. + +To demonstrate how to use distributed graph construction locally we will use the same example data as we use in our unit tests, which you can find in the project's repository, under ``graphstorm/graphstorm-processing/tests/resources/small_heterogeneous_graph``. -Install example dependencies ----------------------------- +Install dependencies +-------------------- -To run the local example you will need to install the GSProcessing +To run the local example you will need to install the GSProcessing and GraphStorm library to your Python environment, and you'll need to clone the -GraphStorm repository to get access to the data. +GraphStorm repository to get access to the data, and DGL tool for GSPartition. Follow the :ref:`gsp-installation-ref` guide to install the GSProcessing library. -You can clone the repository using +To run GSPartition job, you can install the dependencies as following: .. code-block:: bash + pip install graphstorm + pip install pydantic + pip install torch==2.1.0 --index-url https://download.pytorch.org/whl/cpu + pip install dgl==1.1.3 -f https://data.dgl.ai/wheels-internal/repo.html git clone https://github.com/awslabs/graphstorm.git + cd graphstorm + git clone --branch v1.1.3 https://github.com/dmlc/dgl.git You can then navigate to the ``graphstorm-processing/`` directory that contains the relevant data: .. code-block:: bash - cd ./graphstorm/graphstorm-processing/ + cd ./graphstorm-processing/ Expected file inputs and configuration -------------------------------------- +The example will include GSProcessing as the first step and GSPartition as the second step. + GSProcessing expects the input files to be in a specific format that will allow us to perform the processing and prepare the data for partitioning and training. +GSPartition then takes the output of GSProcessing to produce graph data in DistDGLGraph format for training or inference.. The data files are expected to be: @@ -202,8 +215,8 @@ For more details on the re-partitioning step see .. _gsp-examining-output: -Examining the job output ------------------------- +Examining the job output of GSProcessing +------------------------------------------ Once the processing and re-partitioning jobs are done, we can examine the outputs they created. The output will be @@ -281,28 +294,74 @@ in an ``edge_data`` directory. for node id 1 etc. -At this point you can use the DGL distributed partitioning pipeline -to partition your data, as described in the -`DGL documentation `_ -. +Run a GSPartition job locally +------------------------------ +While :ref:`GSPartition` is designed to run on a multi-machine cluster, +you can run GSPartition job locally for the example. Once you have completed the installation +and the GSProcessing example described in the previous section, you can proceed to run the GSPartition step. + +Assuming your working directory is ``graphstorm``, +you can use the following command to run the partition job locally: + +.. code:: bash + + echo 127.0.0.1 > ip_list.txt + python3 -m graphstorm.gpartition.dist_partition_graph \ + --input-path /tmp/gsprocessing-example/ \ + --metadata-filename updated_row_counts_metadata.json \ + --output-path /tmp/gspartition-example/ \ + --num-parts 2 \ + --dgl-tool-path ./dgl/tools \ + --partition-algorithm random \ + --ip-config ip_list.txt -To simplify the process of partitioning and training, without the need -to manage your own infrastructure, we recommend using GraphStorm's -`SageMaker wrappers `_ -that do all the hard work for you and allow -you to focus on model development. In particular you can follow the GraphStorm documentation to run -`distributed partitioning on SageMaker `_. +The command above will first do graph partitioning to determine the ownership for each partition and save the results. +Then it will do data dispatching to physically assign the partitions to graph data and dispatch them to each machine. +Finally it will generate the graph data ready for training/inference. +Examining the job output of GSPartition +--------------------------------------- -To run GSProcessing jobs on Amazon SageMaker we'll need to follow -:ref:`GSProcessing distributed setup` to set up our environment -and :ref:`Running GSProcessing on SageMaker` to execute the job. +Once the partition job is done, you can examine the outputs. + +.. code-block:: bash + $ cd /tmp/gspartition-example + $ ls -ltR + + dist_graph/ + metadata.json + |- part0/ + edge_feat.dgl + graph.dgl + node_feat.dgl + orig_eids.dgl + orig_nids.dgl + partition_assignment/ + director.txt + genre.txt + movie.txt + partition_meta.json + user.txt + +The ``dist_graph`` folder contains partitioned graph ready for training and inference. + +* ``part0``: As we only specify 1 partition in the previous command, we have one part folder here. +There are five files for the partition + * ``edge_feat.dgl``: The edge features for part 0 stored in binary format. + * ``graph.dgl``: The graph structure data for part 0 stored in binary format. + * ``node_feat.dgl``: The node features data for part 0 stored in binary format. + * ``orig_eids.dgl``: The mapping for edges between raw edge IDs and the partitioned graph edge IDs. + * ``orig_nids.dgl``: The mapping for nodes between raw node IDs and the partitioned graph node IDs. + +* ``metadata.json``: This file contains metadata about the distributed DGL graph. + +The ``partition_assignment`` directory contains different partition results for different node types, +which can reused for the `dgl dispatch pipeline `_ .. rubric:: Footnotes .. [#f1] Note that this is just a hint to the Spark engine, and it's not guaranteed that the number of output partitions will always match - the requested value. -.. [#f2] This doc will be future extended to include a partition example. \ No newline at end of file + the requested value. \ No newline at end of file diff --git a/docs/source/graph-construction/gs-processing/gspartition/ec2-clusters.rst b/docs/source/graph-construction/gs-processing/gspartition/ec2-clusters.rst new file mode 100644 index 0000000000..e12253cbcb --- /dev/null +++ b/docs/source/graph-construction/gs-processing/gspartition/ec2-clusters.rst @@ -0,0 +1,119 @@ +====================================== +Running partition jobs on EC2 Clusters +====================================== + +Once the :ref:`distributed processing` is completed, +users can start the partition jobs. This tutorial will provide instructions on how to setup an EC2 cluster and +start GSPartition jobs on it. + +Create a GraphStorm Cluster +---------------------------- + +Setup instances of a cluster +............................. +A cluster contains several instances, each of which runs a GraphStorm Docker container. Before creating a cluster, we recommend to +follow the :ref:`Environment Setup `. The guide shows how to build GraphStorm Docker images, and use a Docker container registry, +e.g. `AWS ECR `_ , to upload the GraphStorm image to an ECR repository, pull it on the instances in the cluster, +and finally start the image as a container. + +.. note:: + + If you are planning to use **parmetis** algorithm, please prepare your docker image using the following instructions: + + .. code-block:: bash + + git clone https://github.com/awslabs/graphstorm.git + + cd /path-to-graphstorm/docker/ + + bash /path-to-graphstorm/docker/build_docker_parmetis.sh /path-to-graphstorm/ image-name image-tag + + There are three positional arguments for ``build_docker_parmetis.sh``: + + 1. **path-to-graphstorm** (**required**), is the absolute path of the "graphstorm" folder, where you cloned the GraphStorm source code. For example, the path could be ``/code/graphstorm``. + 2. **image-name** (optional), is the assigned name of the Docker image to be built . Default is ``graphstorm``. + 3. **image-tag** (optional), is the assigned tag prefix of the Docker image. Default is ``local``. + +Setup a shared file system for the cluster +........................................... +A cluster requires a shared file system, such as NFS or `EFS `_, mounted to each instance in the cluster, in which all GraphStorm containers can share data files, save model artifacts and prediction results. + +`Here `_ is the instruction of setting up an NFS for a cluster. As the steps of setting an NFS could be various on different systems, we suggest users to look for additional information about NFS setting. Here are some available resources: `NFS tutorial `_ by DigitalOcean, `NFS document `_ for Ubuntu. + +For an AWS EC2 cluster, users can also use EFS as the shared file system. Please follow 1) `the instruction of creating EFS `_; 2) `the instruction of installing an EFS client `_; and 3) `the instructions of mounting the EFS filesystem `_ to set up EFS. + +After setting up a shared file system, we can keep all graph data in a shared folder. Then mount the data folder to the ``/path_to_data/`` of each instances in the cluster so that all GraphStorm containers can access the data. + +Run a GraphStorm container +........................... +In each instance, use the following command to start a GraphStorm Docker container and run it as a backend daemon on cpu. + +.. code-block:: shell + + docker run -v /path_to_data/:/data \ + -v /dev/shm:/dev/shm \ + --network=host \ + -d --name test graphstorm:local-cpu service ssh restart + +This command mounts the shared ``/path_to_data/`` folder to a container's ``/data/`` folder by which GraphStorm codes can access graph data and save the partition result. + +Setup the IP Address File and Check Port Status +---------------------------------------------------------- + +Collect the IP address list +........................... +The GraphStorm Docker containers use SSH on port ``2222`` to communicate with each other. Users need to collect all IP addresses of all the instances and put them into a text file, e.g., ``/data/ip_list.txt``, which is like: + +.. figure:: ../../../../../tutorial/distributed_ips.png + :align: center + +.. note:: We recommend to use **private IP addresses** on AWS EC2 cluster to avoid any possible port constraints. + +Put the IP list file into container's ``/data/`` folder. + +Check port +................ +The GraphStorm Docker container uses port ``2222`` to **ssh** to containers running on other machines without password. Please make sure the port is not used by other processes. + +Users also need to make sure the port ``2222`` is open for **ssh** commands. + +Pick one instance and run the following command to connect to the GraphStorm Docker container. + +.. code-block:: bash + + docker container exec -it test /bin/bash + +Users need to exchange the ssh key from each of GraphStorm Docker container to +the rest containers in the cluster: copy the keys from the ``/root/.ssh/id_rsa.pub`` from one container to ``/root/.ssh/authorized_keys`` in containers on all other containers. +In the container environment, users can check the connectivity with the command ``ssh -o StrictHostKeyChecking=no -p 2222``. Please replace the ```` with the real IP address from the ``ip_list.txt`` file above, e.g., + +.. code-block:: bash + + ssh 172.38.12.143 -o StrictHostKeyChecking=no -p 2222 + +If successful, you should login to the container with ip 172.38.12.143. + +If not, please make sure there is no restriction of exposing port 2222. + + +Launch GSPartition Jobs +----------------------- + +Now we can ssh into the **leader node** of the EC2 cluster, and start GSPartition process with the following command: + +.. code:: bash + + python3 -m graphstorm.gpartition.dist_partition_graph + --input-path ${LOCAL_INPUT_DATAPATH} \ + --metadata-filename ${METADATA_FILE} \ + --output-path ${LOCAL_OUTPUT_DATAPATH} \ + --num-parts ${NUM_PARTITIONS} \ + --partition-algorithm ${ALGORITHM} \ + --ip-config ${IP_CONFIG} + +.. warning:: + 1. Please make sure the both ``LOCAL_INPUT_DATAPATH`` and ``LOCAL_OUTPUT_DATAPATH`` are located on the shared filesystem. + 2. The number of instances in the cluster should be equal to ``NUM_PARTITIONS``. + 3. For users who only want to generate partition assignments instead of the partitioned DGL graph, please add ``--partition-assignment-only`` flag. + +Currently we support both ``random`` and ``parmetis`` as the partitioning algorithm for EC2 clusters. diff --git a/docs/source/graph-construction/gs-processing/gspartition/index.rst b/docs/source/graph-construction/gs-processing/gspartition/index.rst new file mode 100644 index 0000000000..1e4032175c --- /dev/null +++ b/docs/source/graph-construction/gs-processing/gspartition/index.rst @@ -0,0 +1,28 @@ +.. _gspartition_index: + +=================================== +Running partition jobs on AWS Infra +=================================== + +GraphStorm Distributed Graph Partition (GSPartition) allows users to do distributed partition on preprocessed graph data +prepared by :ref:`GSProcessing`. To enable distributed training, the preprocessed input data must be converted to a partitioned graph representation. +GSPartition allows user to handle massive graph data in distributed clusters. GSPartition is built on top of the +dgl `distributed graph partitioning pipeline `_. + +GSPartition consists of two steps: Graph Partitioning and Data Dispatching. Graph Partitioning step will assign each node to one partition +and save the results as a set of files called partition assignment. Data Dispatching step will physically partition the +graph data and dispatch them according to the partition assignment. It will generate the graph data in DGL format, ready for distributed training and inference. + +Tutorials for GSPartition are specifically prepared based on AWS infrastructure, +i.e., `Amazon SageMaker `_ and `Amazon EC2 clusters `_. +But, users can create your own clusters easily by following the GSPartition tutorial on Amazon EC2. + +The first section includes instructions on how to run GSPartition on `Amazon SageMaker `_. +The second section includes instructions on how to run GSPartition on `Amazon EC2 clusters `_. + +.. toctree:: + :maxdepth: 1 + :glob: + + sagemaker.rst + ec2-clusters.rst diff --git a/docs/source/graph-construction/gs-processing/gspartition/sagemaker.rst b/docs/source/graph-construction/gs-processing/gspartition/sagemaker.rst new file mode 100644 index 0000000000..946de73ea4 --- /dev/null +++ b/docs/source/graph-construction/gs-processing/gspartition/sagemaker.rst @@ -0,0 +1,134 @@ +========================================== +Running partition jobs on Amazon SageMaker +========================================== + +Once the :ref:`distributed processing` is complete, +you can use Amazon SageMaker launch scripts to launch distributed processing jobs with AWS resources. + +Build the Docker Image for GSPartition Jobs on Amazon SageMaker +--------------------------------------------------------------- +GSPartition job on Amazon SageMaker uses its SageMaker's **BYOC** (Bring Your Own Container) mode. + +Step 1: Build an Amazon SageMaker-compatible Docker image +.......................................................... + +.. note:: + * Please make sure your account has access key (AK) and security access key (SK) configured to authenticate accesses to AWS services, users can refer to `example policy `_. + * For more details of Amazon ECR operation via CLI, users can refer to the `Using Amazon ECR with the AWS CLI document `_. + +First, in a Linux machine, configure a Docker environment by following the `Docker documentation `_ suggestions. + +In order to use the Amazon SageMaker base Docker image, users need to refer the `DLC image command `_ +to find the specific Docker image commands. For example, below is the command for user authentication to access the Amazon SageMaker base Docker image. + +.. code-block:: bash + + aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 763104351884.dkr.ecr.us-east-1.amazonaws.com + +.. note:: + For region other than ``us-east-1``, please refer to `available region `_ + +Secondly, clone GraphStorm source code, and build a GraphStorm SageMaker compatible Docker image from source with commands: + +.. code-block:: bash + + git clone https://github.com/awslabs/graphstorm.git + + cd /path-to-graphstorm/docker/ + + bash build_docker_sagemaker.sh ../ + +The ``build_docker_sagemaker.sh`` script takes four arguments: + +1. **path-to-graphstorm** (**required**), is the path of the ``graphstorm`` folder, where you cloned the GraphStorm source code. For example, the path could be ``/code/graphstorm``. +2. **DEVICE_TYPE**, is the intended device type of the to-be built Docker image. Please specify ``cpu`` for building CPU-compatible images for partition job. +3. **IMAGE_NAME** (optional), is the assigned name of the to-be built Docker image. Default is ``graphstorm``. + +.. warning:: + In order to upload the GraphStorm SageMaker Docker image to Amazon ECR, users need to define the to include the ECR URI string, **.dkr.ecr..amazonaws.com/**, e.g., ``888888888888.dkr.ecr.us-east-1.amazonaws.com/graphstorm``. + +4. **IMAGE_TAG** (optional), is the assigned tag name of the to-be built Docker image. Default is ``sm-``, + that is, ``sm-cpu`` for CPU images. + +Once the ``build_docker_sagemaker.sh`` command completes successfully, there will be a Docker image, named ``:``, +such as ``888888888888.dkr.ecr.us-east-1.amazonaws.com/graphstorm:sm-cpu``, in the local repository, which could be listed by running: + +.. code-block:: bash + + docker images graphstorm + +.. _upload_sagemaker_docker: + +Step 2: Upload Docker images to Amazon ECR repository +....................................................... +Because Amazon SageMaker relies on Amazon ECR to access customers' own Docker images, users need to upload the Docker images built in the Step 1 to their own ECR repository. + +The following command will authenticate the user account to access to user's ECR repository via AWS CLI. + +.. code-block:: bash + + aws ecr get-login-password --region | docker login --username AWS --password-stdin .dkr.ecr..amazonaws.com + +Please replace the `` and `` with your own account information and be consistent with the values used in the **Step 1**. + +In addition, users need to create an ECR repository at the specified `` with the name as `` **WITHOUT** the ECR URI string, e.g., ``graphstorm``. + +And then use the following command to push the built GraphStorm Docker image to users' own ECR repository. + +.. code-block:: bash + + docker push : + +Please replace the `` and `` with the actual Docker image name and tag, e.g., ``888888888888.dkr.ecr.us-east-1.amazonaws.com/graphstorm:sm-gpu``. + +Launch the GSPartition Job on Amazon SageMaker +----------------------------------------------- +For this example, we'll use an Amazon SageMaker cluster with 2 ``ml.t3.xlarge`` instances. +We assume the data is already on an AWS S3 bucket. +For large graphs, users can choose larger instances or more instances. + +Install dependencies +..................... +To run GraphStorm with the Amazon SageMaker service, users should install the Amazon SageMaker library and copy GraphStorm's SageMaker tools. + +1. Use the below command to install Amazon SageMaker. + +.. code-block:: bash + + pip install sagemaker + +2. Copy GraphStorm SageMaker tools. Users can clone the GraphStorm repository using the following command or copy the `sagemaker folder `_ to the instance. + +.. code-block:: bash + + git clone https://github.com/awslabs/graphstorm.git + +Launch GSPartition task +........................ +Users can use the following command to launch partition jobs. + +.. code:: bash + + python launch/launch_partition.py \ + --graph-data-s3 ${DATASET_S3_PATH} \ + --num-parts ${NUM_PARTITIONS} \ + --instance-count ${NUM_INSTANCES} \ + --output-data-s3 ${OUTPUT_PATH} \ + --instance-type ${INSTANCE_TYPE} \ + --image-url ${IMAGE_URI} \ + --region ${REGION} \ + --role ${ROLE} \ + --entry-point "run/partition_entry.py" \ + --metadata-filename ${METADATA_FILE} \ + --log-level INFO \ + --partition-algorithm ${ALGORITHM} + +.. warning:: + The ``NUM_INSTANCES`` should be equal to the ``NUM_PARTITIONS`` here. + +Running the above will take the dataset after GSProcessing +from ``${DATASET_S3_PATH}`` as input and create a DistDGL graph with +``${NUM_PARTITIONS}`` under the output path, ``${OUTPUT_PATH}``. +Currently we only support ``random`` as the partitioning algorithm for sagemaker. + + diff --git a/docs/source/graph-construction/gs-processing/index.rst b/docs/source/graph-construction/gs-processing/index.rst index ca239f9bc7..8635b00ffc 100644 --- a/docs/source/graph-construction/gs-processing/index.rst +++ b/docs/source/graph-construction/gs-processing/index.rst @@ -4,13 +4,25 @@ Distributed Graph Construction Beyond single-machine graph construction, distributed graph construction offers enhanced scalability and efficiency. This process involves two main steps: GraphStorm Distributed Data Processing (GSProcessing) -and GraphStorm Distributed Data Partitioning (GPartition). The documentations of GPartition will be released soon. +and GraphStorm Distributed Graph Partitioning (GSPartition). GSProcessing will preprocess the raw data into structured +datasets, and GSPartition will process these structured datasets to create multiple partitions in +`DGL format `_. + +Here is an overview of the workflow for distributed graph construction: + +* **Prepare input data**: GraphStorm Distributed Construction accepts tabular files in parquet/CSV format. +* **Run GSProcessing**: Use GSProcessing to process the input data. This step prepares the data for partitioning including edge and node data, transformation details, and node id mappings. +* **Run GSPartition**: Use GSPartition to partition the processed data into graph files suitable for distributed training. + +.. figure:: ../../../../tutorial/distributed_construction.png + :align: center The following sections provide guidance on doing distributed graph construction. The first section details the execution environment setup for GSProcessing. The second section offers examples on drafting a configuration file for a GSProcessing job. The third section explains how to deploy your GSProcessing job with AWS infrastructure. -The final section shows an example to quick start GSProcessing. +The fourth section includes how to do GSPartition with the GSProcessing output. +The final section shows an example to quickly start GSProcessing and GSPartition. .. toctree:: :maxdepth: 1 @@ -19,4 +31,5 @@ The final section shows an example to quick start GSProcessing. prerequisites/index.rst input-configuration.rst aws-infra/index.rst + gspartition/index.rst example.rst \ No newline at end of file diff --git a/docs/source/graph-construction/gs-processing/prerequisites/index.rst b/docs/source/graph-construction/gs-processing/prerequisites/index.rst index 1e1cf534f7..7951c26994 100644 --- a/docs/source/graph-construction/gs-processing/prerequisites/index.rst +++ b/docs/source/graph-construction/gs-processing/prerequisites/index.rst @@ -1,3 +1,5 @@ +.. _gsprocessing_prerequisites_index: + =============================================== Distributed GraphStorm Processing =============================================== diff --git a/docs/source/graph-construction/index.rst b/docs/source/graph-construction/index.rst index b917c814f6..4c5d8403f5 100644 --- a/docs/source/graph-construction/index.rst +++ b/docs/source/graph-construction/index.rst @@ -1,3 +1,5 @@ +.. _graph_construction: + ================== Graph Construction ================== diff --git a/graphstorm-processing/docker/0.3.1/emr-serverless/Dockerfile.cpu b/graphstorm-processing/docker/0.3.1/emr-serverless/Dockerfile.cpu index b3b511f6b4..e8db91b4bc 100644 --- a/graphstorm-processing/docker/0.3.1/emr-serverless/Dockerfile.cpu +++ b/graphstorm-processing/docker/0.3.1/emr-serverless/Dockerfile.cpu @@ -1,5 +1,5 @@ ARG ARCH=x86_64 -FROM public.ecr.aws/emr-serverless/spark/emr-7.0.0:20240206-${ARCH} as base +FROM public.ecr.aws/emr-serverless/spark/emr-7.1.0:20240528-${ARCH} as base USER root ENV PYTHON_VERSION=3.9.18 @@ -40,6 +40,8 @@ else \ python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \ fi +# We use this file as an indicator of the execution environment +RUN touch /usr/lib/spark/code/EMR_SERVERLESS_EXECUTION # GSProcessing codebase COPY code/ /usr/lib/spark/code/ diff --git a/graphstorm-processing/graphstorm_processing/distributed_executor.py b/graphstorm-processing/graphstorm_processing/distributed_executor.py index 0b2e6e5b21..c374056f56 100644 --- a/graphstorm-processing/graphstorm_processing/distributed_executor.py +++ b/graphstorm-processing/graphstorm_processing/distributed_executor.py @@ -573,10 +573,10 @@ def main(): format="[GSPROCESSING] %(asctime)s %(levelname)-8s %(message)s", ) - # Determine if we're running within a SageMaker container + # Determine execution environment if os.path.exists("/opt/ml/config/processingjobconfig.json"): execution_env = ExecutionEnv.SAGEMAKER - elif os.path.exists("/emr-serverless-config.json"): + elif os.path.exists("/usr/lib/spark/code/EMR_SERVERLESS_EXECUTION"): execution_env = ExecutionEnv.EMR_SERVERLESS elif os.path.exists("/usr/lib/spark/code/EMR_EXECUTION"): execution_env = ExecutionEnv.EMR_ON_EC2 diff --git a/inference_scripts/mt_infer/ml_nc_lp_norm_with_mask_infer.yaml b/inference_scripts/mt_infer/ml_nc_lp_norm_with_mask_infer.yaml new file mode 100644 index 0000000000..53e2d267dc --- /dev/null +++ b/inference_scripts/mt_infer/ml_nc_lp_norm_with_mask_infer.yaml @@ -0,0 +1,59 @@ +--- +version: 1.0 +gsf: + basic: + backend: gloo + verbose: false + save_perf_results_path: null + batch_size: 32 + node_feat_name: + - user:feat + - movie:title + gnn: + model_encoder_type: rgcn + num_layers: 1 + hidden_size: 32 + use_mini_batch_infer: true + input: + restore_model_path: null + output: + save_model_path: null + save_embed_path: null + hyperparam: + dropout: 0. + lr: 0.001 + no_validation: false + rgcn: + num_bases: -1 + use_self_loop: true + use_node_embeddings: false + multi_task_learning: + - node_classification: + target_ntype: "movie" + label_field: "label" + multilabel: false + num_classes: 19 + batch_size: 16 # will overwrite the global batch_size + mask_fields: + - "train_mask_c0" # node classification mask 0 + - "val_mask_c0" + - "test_mask_c0" + eval_metric: + - "accuracy" + - link_prediction: + lp_loss_func: "contrastive" + num_negative_edges: 4 + num_negative_edges_eval: 100 + train_negative_sampler: joint + eval_etype: + - "user,rating,movie" + train_etype: + - "user,rating,movie" + exclude_training_targets: true + reverse_edge_types_map: + - user,rating,rating-rev,movie + batch_size: 128 # will overwrite the global batch_size + mask_fields: + - "train_mask_field_lp" + - null # empty means there is no validation mask + - "test_mask_field_lp" \ No newline at end of file diff --git a/python/graphstorm/dataloading/dataloading.py b/python/graphstorm/dataloading/dataloading.py index 1020529857..d3c9728770 100644 --- a/python/graphstorm/dataloading/dataloading.py +++ b/python/graphstorm/dataloading/dataloading.py @@ -117,7 +117,7 @@ class MultiLayerNeighborSamplerForReconstruct(dgl.dataloading.BlockSampler): construct_feat_ntype : list of str The node types that requires to construct node features. construct_feat_fanout : int - The fanout required to construct node features. + The fanout used when constructing node features for feature-less nodes. """ def __init__(self, sampler, dataset, construct_feat_ntype, construct_feat_fanout): super().__init__() @@ -151,38 +151,45 @@ def sample_blocks(self, g, seed_nodes, exclude_eids=None): class GSgnnEdgeDataLoaderBase(): """ The base dataloader class for edge tasks. - If users want to customize the dataloader for edge prediction tasks + If users want to customize dataloaders for edge prediction tasks, they should extend this base class by implementing the special methods - `__iter__` and `__next__`. + ``__iter__``, ``__next__``, and ``__len__``. Parameters ---------- dataset : GSgnnData - The dataset for the edge task. + The GraphStorm data for edge tasks. target_idx : dict of Tensors - The target edge IDs. + The target edge indexes for prediction. fanout : list or dict of lists - The fanout for each GNN layer. + The fanout for each GNN layer. If it's a dict of lists, it indicates the fanout for each + edge type. label_field: str or dict of str Label field of the edge task. - node_feats: str, or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None - decoder_edge_feats: str or dict of list of str - Edge features used in decoder. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None + node_feats: str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. + decoder_edge_feats: str, or dict of list of str + Edge feature fileds used in edge decoders in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. """ def __init__(self, dataset, target_idx, fanout, label_field, node_feats=None, edge_feats=None, decoder_edge_feats=None): @@ -199,154 +206,168 @@ def __init__(self, dataset, target_idx, fanout, self._decoder_edge_feats = decoder_edge_feats def __iter__(self): - """ Returns an iterator object + """ Returns an iterator object. """ def __next__(self): """ Return a mini-batch data for the edge task. - A mini-batch comprises three objects: the input node IDs, - the target edges and the subgraph blocks for message passing. + A mini-batch comprises three objects: 1) the input node IDs, + 2) the target edges, and 3) the subgraph blocks for message passing. Returns ------- - dict of Tensors : the input node IDs of the mini-batch. - DGLGraph : the target edges. - list of DGLGraph : the subgraph blocks for message passing. + + - dict of Tensors : the input node IDs of the mini-batch. + - DGLGraph : the target edges. + - list of DGLGraph : the subgraph blocks for message passing. + """ def __len__(self): - """ Return the length (number of mini-batches) of the data loader + """ Return the length (number of mini-batches) of the data loader. Returns + ------- int: length """ @property def data(self): - """ The dataset of this dataloader. + """ The dataset of this dataloader, which is given in class initialization. Returns ------- - GSgnnData : The dataset of the dataloader. + GSgnnData: The dataset of the dataloader. """ return self._data @property def target_eidx(self): - """ Target edge idx for prediction + """ Target edge indexes for prediction, which is given in class initialization. Returns ------- - dict of Tensors : the target edge IDs. + dict of Tensors: the target edge IDs, which is given in class initialization. """ return self._target_eidx @property def fanout(self): - """ The fan out of each GNN layers + """ The fan out of each GNN layers, which is given in class initialization. Returns ------- - list or a dict of list : the fanouts for each GNN layer. + list or a dict of list: the fanouts for each GNN layer, which is given in class + initialization. """ return self._fanout @property def label_field(self): - """ The label field + """ The label field, which is given in class initialization. Returns ------- - str: Label fields in the graph. + str: Label fields in the graph, which is given in class initialization. """ return self._label_field @property def node_feat_fields(self): - """ Node features + """ Node feature fields, which is given in class initialization. Returns ------- - str or dict of list of str: Node feature fields in the graph. + str or dict of list of str: Node feature fields in the graph, which is given in class + initialization. """ return self._node_feats @property def edge_feat_fields(self): - """ Edge features + """ Edge feature fields, which is given in class initialization. Returns ------- - str or dict of list of str: Node feature fields in the graph. + str or dict of list of str: Node feature fields in the graph, which is given in class + initialization. """ return self._edge_feats @property def decoder_edge_feat_fields(self): - """ Edge features for edge decoder. + """ Edge features for edge decoder, which is given in class initialization. Returns ------- - str or dict of list of str: Node feature fields in the graph. + str or dict of list of str: Node feature fields in the graph, which is given in class + initialization. """ return self._decoder_edge_feats class GSgnnEdgeDataLoader(GSgnnEdgeDataLoaderBase): - """ The minibatch dataloader for edge prediction + """ The mini-batch dataloader for edge prediction tasks. - GSgnnEdgeDataLoader samples GraphStorm edge dataset into an iterable over mini-batches - of samples. Both source and destination nodes are included in the batch_graph, which + ``GSgnnEdgeDataLoader`` samples target edges into an iterable over mini-batches + of samples. Both source and destination nodes are included in the ``batch_graph``, which will be used by GraphStorm Trainers and Inferrers. Parameters ------------ dataset: GSgnnData - The GraphStorm edge dataset + The GraphStorm data. target_idx : dict of Tensors - The target edges for prediction - fanout: list of int or dict of list - Neighbor sample fanout. If it's a dict, it indicates the fanout for each edge type. + The target edge indexes for prediction. + fanout: list of int, or dict of list + Neighbor sampling fanout. If it's a dict of list, it indicates the fanout for each + edge type. batch_size: int - Batch size + Mini-batch size. label_field: str or dict of str Label field of the edge task. - node_feats: str, or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None - decoder_edge_feats: str or dict of list of str - Edge features used in decoder. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None + node_feats: str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, or dict of list of str + Edge features fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. + decoder_edge_feats: str, or dict of list of str + Edge features used in edge decoders in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. train_task : bool - Whether or not for training. + Whether or not is the dataloader for training. reverse_edge_types_map: dict - A map for reverse edge type + A map for reverse edge type. exclude_training_targets: bool - Whether to exclude training edges during neighbor sampling + Whether to exclude training edges during neighbor sampling. remove_target_edge_type: bool - Whether we will exclude all edges of the target edge type in message passing. + Whether to exclude all edges of the target edge type in message passing. construct_feat_ntype : list of str The node types that requires to construct node features. construct_feat_fanout : int - The fanout required to construct node features. + The fanout used when constructing node features for feature-less nodes. Examples ------------ To train a 2-layer GNN for edge prediction on a set of edges ``target_idx`` on - a graph where each nodes takes messages from 15 neighbors on the first layer - and 10 neighbors on the second. + a graph where each edge (source and destination node pair) takes messages from 15 + neighbors on the first layer and 10 neighbors on the second. .. code:: python @@ -443,29 +464,17 @@ def __next__(self): return self.dataloader.__next__() def __len__(self): - # Follow - # https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116 - # In DGL, DistDataLoader.expected_idxs is the length (number of batches) - # of the datalaoder. - return self.dataloader.expected_idxs - - @property - def data(self): - """ The dataset of this dataloader. - """ - return self._data - - @property - def target_eidx(self): - """ Target edge idx for prediction """ - return self._target_eidx + Follow + https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116. + In DGL, ``DistDataLoader.expected_idxs`` is the length (number of batches) + of the dataloader. - @property - def fanout(self): - """ The fan out of each GNN layers + Returns: + -------- + int: The length (number of batches) of the dataloader. """ - return self._fanout + return self.dataloader.expected_idxs ################ Minibatch DataLoader (Link Prediction) ####################### @@ -483,36 +492,41 @@ def fanout(self): BUILTIN_LP_FIXED_NEG_SAMPLER = 'fixed' class GSgnnLinkPredictionDataLoaderBase(): - """ The base class of link prediction dataloader. + """ The base dataloader class for link prediction tasks. - If users want to customize the dataloader for link prediction tasks + If users want to customize dataloaders for link prediction tasks, they should extend this base class by implementing the special methods - `__iter__` and `__next__`. + ``__iter__``, ``__next__``, and ``__len__``. Parameters ---------- dataset: GSgnnData - The GraphStorm edge dataset + The GraphStorm data for link prediction tasks. target_idx : dict of Tensors - The target edges for prediction - fanout: list of int or dict of list - Neighbor sample fanout. If it's a dict, it indicates the fanout for each edge type. - node_feats: str, or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None - pos_graph_edge_feats: str or dist of list of str + The target edge indexes for link prediction. + fanout: list of int, or dict of list + Neighbor sampling fanout. If it's a dict of list, it indicates the fanout for each + edge type. + node_feats: str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. + pos_graph_edge_feats: str, or dict of list of str The field of the edge features used by positive graph in link prediction. - For example edge weight. - Default: None + For example edge weights. + Default: None. """ def __init__(self, dataset, target_idx, fanout, node_feats=None, edge_feats=None, pos_graph_edge_feats=None): @@ -527,36 +541,40 @@ def __init__(self, dataset, target_idx, fanout, self._pos_graph_edge_feats = pos_graph_edge_feats def __iter__(self): - """ Returns an iterator object + """ Returns an iterator object. """ def __next__(self): """ Return a mini-batch for link prediction. A mini-batch of link prediction contains four objects: - * the input node IDs of the mini-batch, - * the target positive edges for prediction, - * the negative edges for prediction, - * the subgraph blocks for message passing. + + - the input node IDs of the mini-batch. + - the target positive edges for prediction. + - the sampled negative edges for prediction. + - the subgraph blocks for message passing. Returns ------- - Tensor or dict of Tensors : the input nodes of a mini-batch. - DGLGraph : positive edges. - DGLGraph : negative edges. - list of DGLGraph : subgraph blocks for message passing. + + - Tensor or dict of Tensors: the input nodes of a mini-batch. + - DGLGraph: positive edges. + - DGLGraph: negative edges. + - list of DGLGraph: subgraph blocks for message passing. + """ def __len__(self): - """ Return the length (number of mini-batches) of the data loader + """ Return the length (number of mini-batches) of the data loader. Returns + ------- int: length """ @property def data(self): - """ The dataset of this dataloader. + """ The dataset of this dataloader, which is given in class initialization. Returns ------- @@ -566,7 +584,7 @@ def data(self): @property def fanout(self): - """ The fan out of each GNN layers + """ The fan out of each GNN layers, which is given in class initialization. Returns ------- @@ -576,7 +594,7 @@ def fanout(self): @property def target_eidx(self): - """ The target edges for prediction. + """ The target edge indexes for prediction, which is given in class initialization. Returns ------- @@ -586,7 +604,7 @@ def target_eidx(self): @property def node_feat_fields(self): - """ Node features + """ Node feature fields, which is given in class initialization. Returns ------- @@ -596,85 +614,92 @@ def node_feat_fields(self): @property def edge_feat_fields(self): - """ Edge features + """ Edge feature fields, which is given in class initialization. Returns ------- - str or dict of list of str: Node feature fields in the graph. + str or dict of list of str: Edge feature fields in the graph. """ return self._edge_feats @property - def pos_graph_feat_fields(self): - """ Get edge feature fields of positive graphs + def pos_graph_edge_feat_fields(self): + """ Get edge feature fields of positive graphs, which is given in class initialization. Returns ------- - str or dict of list of str: Node feature fields in the graph. + str or dict of list of str: Edge feature fields in the positive graph. """ return self._pos_graph_edge_feats class GSgnnLinkPredictionDataLoader(GSgnnLinkPredictionDataLoaderBase): - """ Link prediction minibatch dataloader + """ Mini-batch dataloader for link prediction. - GSgnnLinkPredictionDataLoader samples GraphStorm edge dataset into an iterable over mini-batches - of samples. In each batch, pos_graph and neg_graph are sampled subgraph for positive and - negative edges, which will be used by GraphStorm Trainers and Inferrers. Given a positive edge, - a negative edge is composed of the source node and a random negative destination nodes - according to a uniform distribution. + ``GSgnnLinkPredictionDataLoader`` samples GraphStorm data into an iterable over mini-batches + of samples. In each batch, ``pos_graph`` and ``neg_graph`` are sampled subgraph for positive + and negative edges, which will be used by GraphStorm Trainers and Inferrers. + + Given a positive edge, a negative edge is composed of the source node and a random negative + destination nodes according to a uniform distribution. Argument -------- dataset: GSgnnData - The GraphStorm edge dataset + The GraphStorm data. target_idx : dict of Tensors - The target edges for prediction - fanout: list of int or dict of list - Neighbor sample fanout. If it's a dict, it indicates the fanout for each edge type. + The target edge indexes for prediction. + fanout: list of int, or dict of list + Neighbor sampling fanout. If it's a dict of list, it indicates the fanout for each + edge type. batch_size: int - Batch size + Mini-batch size. num_negative_edges: int - The number of negative edges per positive edge - node_feats: str, or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None - pos_graph_edge_feats: str or dist of list of str - The field of the edge features used by positive graph in link prediction. + The number of negative edges per positive edge. + node_feats: str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. + pos_graph_edge_feats: str, or dict of list of str + The edge feature fields used by positive graph in link prediction. For example edge weight. - Default: None + Default: None. train_task : bool - Whether or not for training. + Whether or not it is a dataloader for training. reverse_edge_types_map: dict - A map for reverse edge type + A map for reverse edge type. exclude_training_targets: bool - Whether to exclude training edges during neighbor sampling + Whether to exclude training edges during neighbor sampling. edge_mask_for_gnn_embeddings : str - The mask that indicates the edges used for computing GNN embeddings. By default, + The mask indicates the edges used for computing GNN embeddings. By default, the dataloader uses the edges in the training graphs to compute GNN embeddings to avoid information leak for link prediction. construct_feat_ntype : list of str The node types that requires to construct node features. construct_feat_fanout : int - The fanout required to construct node features. - edge_dst_negative_field: str or dict of str - The feature field(s) that store the hard negative edges for each edge type. - num_hard_negs: int or dict of int - The number of hard negatives per positive edge for each edge type + The fanout used when constructing node features for feature-less nodes. + edge_dst_negative_field: str, or dict of str + The feature fields that store the hard negative edges for each edge type. + num_hard_negs: int, or dict of int + The number of hard negatives per positive edge for each edge type. Examples ------------ To train a 2-layer GNN for link prediction on a set of positive edges ``target_idx`` on - a graph where each nodes takes messages from 15 neighbors on the first layer - and 10 neighbors on the second. We use 10 negative edges per positive in this example. + a graph where each edge (a source and destination node pair) takes messages from 15 neighbors + on the first layer and 10 neighbors on the second. + We use 10 negative edges per positive in this example. .. code:: python @@ -785,10 +810,16 @@ def __next__(self): return self.dataloader.__next__() def __len__(self): - # Follow - # https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116 - # In DGL, DistDataLoader.expected_idxs is the length (number of batches) - # of the datalaoder. + """ + Follow + https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116. + In DGL, ``DistDataLoader.expected_idxs`` is the length (number of batches) + of the dataloader. + + Returns: + -------- + int: The length (number of batches) of the dataloader. + """ return self.dataloader.expected_idxs class GSgnnLPJointNegDataLoader(GSgnnLinkPredictionDataLoader): @@ -929,7 +960,7 @@ def _prepare_negative_sampler(self, num_negative_edges): class AllEtypeDistEdgeDataLoader(DistDataLoader): """ Distributed edge data sampler that samples at least one - edge for each edge type in a minibatch + edge for each edge type in a mini-batch Parameters ---------- @@ -978,7 +1009,7 @@ def _reinit_dataset(self): bs_per_type = {} for etype, idxs in self.data_idx.items(): # compute the number of edges to be sampled for - # each edge type in a minibatch. + # each edge type in a mini-batch. # If batch_size * num_edges / total_edges < 0, then set 1. # # Note: The resulting batch size of a mini batch may be larger @@ -1066,7 +1097,7 @@ def _next_data(self): return new_ret class GSgnnAllEtypeLinkPredictionDataLoader(GSgnnLinkPredictionDataLoader): - """ Link prediction minibatch dataloader. In each minibatch, + """ Link prediction mini-batch dataloader. In each mini-batch, at least one edge is sampled from each etype. Note: using this dataloader with a graph with massive etypes @@ -1147,15 +1178,15 @@ def __next__(self): def __len__(self): # Follow - # https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116 + # https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116. # In DGL, DistDataLoader.expected_idxs is the length (number of batches) - # of the datalaoder. + # of the dataloader. # AllEtypeDistEdgeDataLoader is a child class of DistDataLoader. return self.dataloader.expected_idxs class GSgnnAllEtypeLPJointNegDataLoader(GSgnnAllEtypeLinkPredictionDataLoader): """ Link prediction dataloader with joint negative sampler. - In each minibatch, at least one edge is sampled from each etype. + In each mini-batch, at least one edge is sampled from each etype. """ @@ -1165,47 +1196,52 @@ def _prepare_negative_sampler(self, num_negative_edges): return negative_sampler class GSgnnLinkPredictionTestDataLoader(GSgnnLinkPredictionDataLoaderBase): - """ Link prediction minibatch dataloader for validation and test. + """ Mini-batch dataloader for link prediction validation and test. In order to efficiently compute positive and negative scores for - link prediction tasks, GSgnnLinkPredictionTestDataLoader is designed - to only generates edges, i.e., (src, dst) pairs. + link prediction tasks, ``GSgnnLinkPredictionTestDataLoader`` is designed + to only generates edges, i.e., source and destination node pairs. The negative edges are sampled uniformly. Parameters ----------- dataset: GSgnnData - The GraphStorm edge dataset + The GraphStorm data. target_idx : dict of Tensors - The target edges for prediction + The target edge indexes for link prediction. batch_size: int - Batch size + Mini-batch size. num_negative_edges: int - The number of negative edges per positive edge - fanout: int - Evaluation fanout for computing node embedding + The number of negative edges per positive edge. + fanout: list of int, or dict of list + Neighbor sampling fanout. If it's a dict of list, it indicates the fanout for each + edge type. fixed_test_size: int Fixed number of test data used in evaluation. If it is none, use the whole testset. - When test is huge, using fixed_test_size + When test is huge, using `fixed_test_size` can save validation and test time. Default: None. - node_feats: str, or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None - pos_graph_edge_feats: str or dist of list of str - The field of the edge features used by positive graph in link prediction. + node_feats: str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. + pos_graph_edge_feats: str or dict of list of str + The edge feature fields used by positive graph in link prediction. For example edge weight. - Default: None + Default: None. """ def __init__(self, dataset, target_idx, batch_size, num_negative_edges, fanout=None, fixed_test_size=None, @@ -1285,15 +1321,10 @@ def __len__(self): num_iters += math.ceil(test_size / self._batch_size) return num_iters - @property - def fanout(self): - """ Get eval fanout - """ - return self._fanout class GSgnnLinkPredictionJointTestDataLoader(GSgnnLinkPredictionTestDataLoader): - """ Link prediction minibatch dataloader for validation and test - with joint negative sampler + """ Mini-batch dataloader for Link prediction validation and test set + with joint negative sampler. """ def _prepare_negative_sampler(self, num_negative_edges): @@ -1303,43 +1334,48 @@ def _prepare_negative_sampler(self, num_negative_edges): return negative_sampler class GSgnnLinkPredictionPredefinedTestDataLoader(GSgnnLinkPredictionTestDataLoader): - """ Link prediction minibatch dataloader for validation and test - with predefined negatives. + """ Mini-batch dataloader for link prediction validation and test + with predefined negatives. Parameters ----------- dataset: GSgnnData - The GraphStorm edge dataset + The GraphStorm data. target_idx : dict of Tensors - The target edges for prediction + The target edge indexes for link prediction. batch_size: int - Batch size - fanout: int - Evaluation fanout for computing node embedding + Mini-batch size. + fanout: list of int, or dict of list + Neighbor sampling fanout. If it's a dict of list, it indicates the fanout for each + edge type. fixed_test_size: int Fixed number of test data used in evaluation. If it is none, use the whole testset. - When test is huge, using fixed_test_size + When test is huge, using `fixed_test_size` can save validation and test time. Default: None. - fixed_edge_dst_negative_field: str or list of str - The feature field(s) that store the fixed negative set for each edge. - node_feats: str, or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None - pos_graph_edge_feats: str or dist of list of str - The field of the edge features used by positive graph in link prediction. + fixed_edge_dst_negative_field: str, or list of str + The feature fields that store the fixed negative set for each edge. + node_feats: str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. + pos_graph_edge_feats: str, or dict of list of str + The edge feature fields used by positive graph in link prediction. For example edge weight. - Default: None + Default: None. """ def __init__(self, dataset, target_idx, batch_size, fixed_edge_dst_negative_field, fanout=None, fixed_test_size=None, @@ -1378,32 +1414,36 @@ def _next_data(self, etype): class GSgnnNodeDataLoaderBase(): """ The base dataloader class for node tasks. - If users want to customize the dataloader for node prediction tasks + If users want to customize dataloaders for their node prediction tasks, they should extend this base class by implementing the special methods - `__iter__` and `__next__`. + ``__iter__``, ``__next__``, and ``__len__``. Parameters ---------- dataset : GSgnnData - The dataset for the node task. + The GraphStorm data for node tasks. target_idx : dict of Tensors - The target node IDs. - fanout : list or dict of lists + The target node indexes for prediction. + fanout : list of int, or dict of lists The fanout for each GNN layer. - label_field: str or dict of str - Label field of the node task. - node_feats: str, or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None + label_field: str, or dict of str + Label field name of the target node types. + node_feats: str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. """ def __init__(self, dataset, target_idx, fanout, label_field, node_feats=None, edge_feats=None): @@ -1418,132 +1458,140 @@ def __init__(self, dataset, target_idx, fanout, self._edge_feats = edge_feats def __iter__(self): - """ Returns an iterator object + """ Returns an iterator object. """ def __next__(self): - """ Return a mini-batch data for the node task. + """ Return a mini-batch data for node tasks. - A mini-batch comprises three objects: the input node IDs of the mini-batch, - the target nodes and the subgraph blocks for message passing. + A mini-batch comprises three objects: 1) the input node IDs of the mini-batch, + 2) the target nodes, and 3) the subgraph blocks for message passing. Returns ------- - dict of Tensors : the input node IDs of the mini-batch. - dict of Tensors : the target node IDs. - list of DGLGraph : the subgraph blocks for message passing. + + - dict of Tensors : the input node IDs of the mini-batch. + - dict of Tensors : the target node indexes. + - list of DGLGraph : the subgraph blocks for message passing. + """ def __len__(self): - """ Return the length (number of mini-batches) of the data loader + """ Return the length (number of mini-batches) of the dataloader. Returns + ------- int: length """ @property def data(self): - """ The dataset of this dataloader. + """ The data of the dataloader, which is given in class initialization. Returns ------- - GSgnnData : The dataset of the dataloader. + GSgnnData : The data of the dataloader. """ return self._data @property def target_nidx(self): - """ Target edge idx for prediction + """ Target edge indexes for prediction , which is given in class initialization. Returns ------- - dict of Tensors : the target edge IDs. + dict of Tensors : the target edge indexes. """ return self._target_idx @property def fanout(self): - """ The fan out of each GNN layers + """ The fan out of each GNN layers , which is given in class initialization. Returns ------- - list or a dict of list : the fanouts for each GNN layer. + list or a dict of list : the fanouts for each GNN layer , which is given in class + initialization. """ return self._fanout @property def label_field(self): - """ The label field + """ The label field, which is given in class initialization. Returns ------- - str: Label fields in the graph. + str, or dict of str: Label fields, which is given in class initialization. """ return self._label_field @property def node_feat_fields(self): - """ Node features + """ Node features fileds, which is given in class initialization. Returns ------- - str or dict of list of str: Node feature fields in the graph. + str, or dict of list of str: Node feature fields, which is given in class initialization. """ return self._node_feats @property def edge_feat_fields(self): - """ Edge features + """ Edge features fields, which is given in class initialization. Returns ------- - str or dict of list of str: Node feature fields in the graph. + str, or dict of list of str: Edge feature fields, which is given in class initialization. """ return self._edge_feats class GSgnnNodeDataLoader(GSgnnNodeDataLoaderBase): - """ Minibatch dataloader for node tasks + """ Mini-batch dataloader for node tasks. - GSgnnNodeDataLoader samples GraphStorm node dataset into an iterable over mini-batches of - samples including target nodes and sampled neighbor nodes, which will be used by GraphStorm + ``GSgnnNodeDataLoader`` samples GraphStorm data into an iterable over mini-batches of + samples, including target nodes and sampled neighbor nodes, which will be used by GraphStorm Trainers and Inferrers. Parameters ---------- dataset: GSgnnData - The GraphStorm dataset + The GraphStorm data. target_idx : dict of Tensors - The target nodes for prediction - fanout: list of int or dict of list - Neighbor sample fanout. If it's a dict, it indicates the fanout for each edge type. + The target node indexes for prediction. + fanout: list of int, or dict of list + Neighbor sampling fanout. If it's a dict of list, it indicates the fanout for each + edge type. label_field: str Label field of the node task. - (TODO:xiangsx) Support list of str for single dataloader multiple node tasks. - node_feats: str, list of str or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. - Default: None - edge_feats: str, list of str or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. - Default: None + node_feats: str, list of str or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + + Default: None. + edge_feats: str, list of str or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + + Default: None. batch_size: int - Batch size + Mini-batch size. train_task : bool - Whether or not for training. + Whether or not it is the dataloader for training. construct_feat_ntype : list of str The node types that requires to construct node features. construct_feat_fanout : int - The fanout required to construct node features. + The fanout used when constructing node features for feature-less nodes. Examples ---------- To train a 2-layer GNN for node classification on a set of nodes ``target_idx`` on - a graph where each nodes takes messages from 15 neighbors on the first layer + a graph where each node takes messages from 15 neighbors on the first layer and 10 neighbors on the second. .. code:: python @@ -1614,48 +1662,57 @@ def __next__(self): return self.dataloader.__next__() def __len__(self): - # Follow - # https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116 - # In DGL, DistDataLoader.expected_idxs is the length (number of batches) - # of the datalaoder. + """ Follow the + https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116. + In DGL, ``DistDataLoader.expected_idxs`` is the length (number of batches) + of the dataloader. + + Returns: + -------- + int: The length (number of batches) of the dataloader. + """ return self.dataloader.expected_idxs class GSgnnNodeSemiSupDataLoader(GSgnnNodeDataLoader): - """ Semisupervised Minibatch dataloader for node tasks + """ Semi-supervised mini-batch dataloader for node tasks. Parameters ---------- dataset: GSgnnData - The GraphStorm dataset + The GraphStorm data. target_idx : dict of Tensors - The target nodes for prediction + The target node indexes for prediction. unlabeled_idx : dict of Tensors - The unlabeled nodes for semi-supervised training - fanout: list of int or dict of list - Neighbor sample fanout. If it's a dict, it indicates the fanout for each edge type. + The unlabeled node indexes for semi-supervised training. + fanout: list of int, or dict of list + Neighbor sampling fanout. If it's a dict of list, it indicates the fanout for each + edge type. batch_size: int - Batch size, the sum of labeled and unlabeled nodes + Mini-batch size, the sum of labeled and unlabeled nodes label_field: str Label field of the node task. - (TODO:xiangsx) Support list of str for single dataloader multiple node tasks. - node_feats: str, list of str or dist of list of str - Node features. - str: All the nodes have the same feature name. - list of string: All the nodes have the same list of features. - dist of list of string: Each node type have different set of node features. + node_feats: str, list of str, or dict of list of str + Node feature fileds in three possible formats: + + - string: All nodes have the same feature name. + - list of string: All nodes have the same list of features. + - dict of list of string: Each node type have different set of node features. + Default: None - edge_feats: str, list of str or dist of list of str - Edge features. - str: All the edges have the same feature name. - list of string: All the edges have the same list of features. - dist of list of string: Each edge type have different set of edge features. + edge_feats: str, list of str, or dict of list of str + Edge feature fileds in three possible formats: + + - string: All edges have the same feature name. + - list of string: All edges have the same list of features. + - dict of list of string: Each edge type have different set of edge features. + Default: None train_task : bool - Whether or not for training. + Whether or not it is the dataloader for training. construct_feat_ntype : list of str The node types that requires to construct node features. construct_feat_fanout : int - The fanout required to construct node features. + The fanout used when constructing node features for feature-less nodes. """ def __init__(self, dataset, target_idx, unlabeled_idx, fanout, batch_size, label_field, @@ -1683,12 +1740,17 @@ def __next__(self): return self.dataloader.__next__(), self.unlabeled_dataloader.__next__() def __len__(self): - # Follow - # https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116 - # In DGL, DistDataLoader.expected_idxs is the length (number of batches) - # of the datalaoder. - # As it uses two dataloader, either one throws - # an End of Iter error will stop the dataloader. + """ + Follow the + https://github.com/dmlc/dgl/blob/1.0.x/python/dgl/distributed/dist_dataloader.py#L116. + In DGL, ``DistDataLoader.expected_idxs`` is the length (number of batches) + of the dataloader. As it uses two dataloader, either one throws an End of Iter error + will stop the dataloader. + + Returns: + -------- + int: The length (number of batches) of the dataloader. + """ return min(self.dataloader.expected_idxs, self.unlabeled_dataloader.expected_idxs) diff --git a/python/graphstorm/dataloading/dataset.py b/python/graphstorm/dataloading/dataset.py index 2915000a38..bb94e5bcbf 100644 --- a/python/graphstorm/dataloading/dataset.py +++ b/python/graphstorm/dataloading/dataset.py @@ -155,29 +155,28 @@ def prepare_batch_edge_input(g, input_edges, return feat class GSgnnData(): - """ The GraphStorm data + """ The GraphStorm data class. Parameters ---------- part_config : str - The path of the partition configuration file. + The path of the partition configuration JSON file. node_feat_field: str or dict of list of str - The fields of the node features that will be encoded by GSNodeInputLayer. + The fields of the node features that will be encoded by ``GSNodeInputLayer``. It's a dict if different node types have different feature names. - Default: None + Default: None. edge_feat_field : str or dict of list of str - The fields of the edge features. - It's a dict if different edge types have - different feature names. - This argument is reserved by future usage. - Default: None + The fields of the edge features. It's a dict, if different edge types have + different feature names. This argument is reserved for future usage when the + ``GSEdgeInputLayer`` is implemented. + Default: None. lm_feat_ntypes : list of str The node types that contains text features. - Default: None + Default: None. lm_feat_etypes : list of tuples The edge types that contains text features. - Default: None + Default: None. """ def __init__(self, part_config, node_feat_field=None, edge_feat_field=None, @@ -258,24 +257,26 @@ def __init__(self, part_config, node_feat_field=None, edge_feat_field=None, @property def g(self): - """ The distributed graph. + """ The distributed graph loaded using information in the given part_config JSON file. """ return self._g @property def graph_name(self): - """ The graph name + """ The distributed graph's name extracted from the given part_config JSON file. """ return self._graph_name @property def node_feat_field(self): - """The field of node feature""" + """ The fields of node features given in initialization. + """ return self._node_feat_field @property def edge_feat_field(self): - """the field of edge feature""" + """ The fields of edge features given in initialization. + """ return self._edge_feat_field def _check_node_feats(self, node_feat_field): @@ -308,7 +309,7 @@ def has_node_feats(self, ntype): Returns ------- - bool : whether the node type has features. + bool : Whether the node type has features. """ if isinstance(self.node_feat_field, str): return True @@ -323,11 +324,11 @@ def has_edge_feats(self, etype): Parameters ---------- etype : (str, str, str) - The canonical edge type + The canonical edge type. Returns ------- - bool : whether the edge type has features + bool : Whether the edge type has features. """ if isinstance(self.edge_feat_field, str): return True @@ -342,11 +343,11 @@ def has_node_lm_feats(self, ntype): Parameters ---------- ntype : str - The node type + The node type. Returns ------- - bool : whether the node type has features. + bool : Whether the node type has text features. """ return ntype in self._lm_feat_ntypes @@ -356,23 +357,24 @@ def has_edge_lm_feats(self, etype): Parameters ---------- etype : (str, str, str) - The edge type + The edge type. Returns ------- - bool : whether the node type has features. + bool : Whether the edge type has text features. """ return etype in self._lm_feat_etypes def get_node_feats(self, input_nodes, nfeat_fields, device='cpu'): - """ Get the node features + """ Get the node features of the given input nodes. The feature fields are defined + in ``nfeat_fields``. Parameters ---------- input_nodes : Tensor or dict of Tensors - The input node IDs - nfeat_fields : str or dict of list - The node features to collect from graph + The input node IDs. + nfeat_fields : str or dict of [str ...] + The node feature fields to be extracted. device : Pytorch device The device where the returned node features are stored. @@ -390,14 +392,15 @@ def get_node_feats(self, input_nodes, nfeat_fields, device='cpu'): feat_field=nfeat_fields) def get_edge_feats(self, input_edges, efeat_fields, device='cpu'): - """ Get the edge features + """ Get the edge features of the given input edges. The feature fields are defined + in ``efeat_fields``. Parameters ---------- input_edges : Tensor or dict of Tensors - The input edge IDs + The input edge IDs. efeat_fields: str or dict of [str ..] - The edge data fields that stores the edge features to retrieve + The edge feature fields to be extracted. device : Pytorch device The device where the returned edge features are stored. @@ -473,15 +476,15 @@ def _check_node_mask(self, ntypes, masks): return masks def get_unlabeled_node_set(self, train_idxs, mask="train_mask"): - """ Collect nodes not used for training. + """ Get node indexes not having the given mask in the training set. Parameters __________ - train_idxs: dict + train_idxs: dict of Tensor The training set. mask: str or list of str - The node feature field storing the training mask. - Default: "train_mask" + The node feature fields storing the training mask. + Default: "train_mask". Returns ------- @@ -510,19 +513,19 @@ def get_unlabeled_node_set(self, train_idxs, mask="train_mask"): return unlabeled_idxs def get_node_train_set(self, ntypes, mask="train_mask"): - """ Get node training set for nodes of ntypes. + """ Get the training set for the given node types under the given mask. Parameters __________ ntypes: str or list of str Node types to get the training set. mask: str or list of str - The node feature field storing the training mask. - Default: "train_mask" + The node feature fields storing the training mask. + Default: "train_mask". Returns ------- - dict of Tensors : The returned training node indexes + dict of Tensors : The returned training node indexes. """ g = self._g pb = g.get_partition_book() @@ -584,19 +587,19 @@ def _get_node_set(self, ntypes, mask): return idxs, num_data def get_node_val_set(self, ntypes, mask="val_mask"): - """ Get node validation set for nodes of ntypes. + """ Get the validation set for the given node types under the given mask. Parameters __________ ntypes: str or list of str Node types to get the validation set. mask: str or list of str - The node feature field storing the validation mask. - Default: "val_mask" + The node feature fields storing the validation mask. + Default: "val_mask". Returns ------- - dict of Tensors : The returned validation node indexes + dict of Tensors : The returned validation node indexes. """ idxs, num_data = self._get_node_set(ntypes, mask) logging.info('part %d, val %d', get_rank(), num_data) @@ -604,19 +607,19 @@ def get_node_val_set(self, ntypes, mask="val_mask"): return idxs def get_node_test_set(self, ntypes, mask="test_mask"): - """ Get node test set for nodes of ntypes. + """ Get the test set for the given node types under the given mask. Parameters __________ ntypes: str or list of str Node types to get the test set. mask: str or list of str - The node feature field storing the test mask. - Default: "test_mask" + The node feature fields storing the test mask. + Default: "test_mask". Returns ------- - dict of Tensors : The returned test node indexes + dict of Tensors : The returned test node indexes. """ idxs, num_data = self._get_node_set(ntypes, mask) logging.info('part %d, test %d', get_rank(), num_data) @@ -624,19 +627,19 @@ def get_node_test_set(self, ntypes, mask="test_mask"): return idxs def get_node_infer_set(self, ntypes, mask="test_mask"): - """ Get node set for inference. + """ Get inference node set for the given node types under the given mask. - If the mask exists in g.nodes[ntype].data, the inference set + If the mask exists in ``g.nodes[ntype].data``, the inference set is collected based on the mask. - If not, the entire node set are treated as the inference set. + If not exist, the entire node set are treated as the inference set. Parameters __________ ntypes: str or list of str Node types to get the inference set. mask: str or list of str - The node feature field storing the inference mask. - Default: "test_mask" + The node feature fields storing the inference mask. + Default: "test_mask". Returns ------- @@ -738,20 +741,20 @@ def _exclude_reverse_etype(self, etypes, reverse_edge_types_map=None): def get_edge_train_set(self, etypes=None, mask="train_mask", reverse_edge_types_map=None): - """ Get edge training set for edges of etypes. + """ Get the training set for the given edge types under the given mask. Parameters __________ etypes: list of str List of edge types to get the training set. If set to None, all the edge types are included. - Default: None + Default: None. mask: str or list of str - The edge feature field storing the training mask. - Default: "train_mask" - reverse_edge_types_map: dict - A map for reverse edge type. - Default: None + The edge feature fields storing the training mask. + Default: "train_mask". + reverse_edge_types_map: dict of tupeles + A map for reverse edge types in the format of {(edge type):(reversed edge type)}. + Default: None. Returns ------- @@ -820,7 +823,7 @@ def _get_edge_set(self, etypes, mask, reverse_edge_types_map): def get_edge_val_set(self, etypes=None, mask="val_mask", reverse_edge_types_map=None): - """ Get edge validation set for edges of etypes. + """ Get the validation set for the given edge types under the given mask. Parameters __________ @@ -829,13 +832,14 @@ def get_edge_val_set(self, etypes=None, mask="val_mask", If set to None, all the edge types are included. mask: str or list of str The edge feature field storing the val mask. - Default: "val_mask" + Default: "val_mask". reverse_edge_types_map: dict - A map for reverse edge type. + A map for reverse edge types in the format of {(edge type):(reversed edge type)}. + Default: None. Returns ------- - dict of Tensors : The returned validation edge indexes + dict of Tensors : The returned validation edge indexes. """ idxs, num_data = self._get_edge_set(etypes, mask, reverse_edge_types_map) logging.info('part %d, val %d', get_rank(), num_data) @@ -844,7 +848,7 @@ def get_edge_val_set(self, etypes=None, mask="val_mask", def get_edge_test_set(self, etypes=None, mask="test_mask", reverse_edge_types_map=None): - """ Get edge test set for edges of etypes. + """ Get the test set for the given edge types under the given mask. Parameters __________ @@ -853,9 +857,10 @@ def get_edge_test_set(self, etypes=None, mask="test_mask", If set to None, all the edge types are included. mask: str or list of str The edge feature field storing the test mask. - Default: "test_mask" + Default: "test_mask". reverse_edge_types_map: dict - A map for reverse edge type. + A map for reverse edge types in the format of {(edge type):(reversed edge type)}. + Default: None. Returns ------- @@ -867,27 +872,28 @@ def get_edge_test_set(self, etypes=None, mask="test_mask", return idxs def get_edge_infer_set(self, etypes=None, mask="test_mask", reverse_edge_types_map=None): - """ Get edge set for inference. + """ Get the inference set for the given edge types under the given mask. - If the mask exists in g.edges[etype].data, the inference set + If the mask exists in ``g.edges[etype].data``, the inference set is collected based on the mask. - If not, the entire edge set are treated as the inference set. + If not exist, the entire edge set are treated as the inference set. Parameters __________ etypes: list of str List of edge types to get the inference set. If set to None, all the edge types are included. - Default: None + Default: None. mask: str or list of str The edge feature field storing the inference mask. - Default: "test_mask" + Default: "test_mask". reverse_edge_types_map: dict - A map for reverse edge type. + A map for reverse edge types in the format of {(edge type):(reversed edge type)}. + Default: None. Returns ------- - dict of Tensors : The returned inference edge indexes + dict of Tensors : The returned inference edge indexes. """ g = self._g pb = g.get_partition_book() diff --git a/python/graphstorm/gconstruct/remap_result.py b/python/graphstorm/gconstruct/remap_result.py index d88219e1d5..e19eebf7d6 100644 --- a/python/graphstorm/gconstruct/remap_result.py +++ b/python/graphstorm/gconstruct/remap_result.py @@ -40,7 +40,8 @@ BUILTIN_TASK_EDGE_CLASSIFICATION, BUILTIN_TASK_EDGE_REGRESSION, BUILTIN_TASK_NODE_CLASSIFICATION, - BUILTIN_TASK_NODE_REGRESSION) + BUILTIN_TASK_NODE_REGRESSION, + BUILTIN_TASK_LINK_PREDICTION) GS_OUTPUT_FORMAT_PARQUET = "parquet" GS_OUTPUT_FORMAT_CSV = "csv" @@ -655,16 +656,28 @@ def _parse_gs_config(config): node_id_mapping = os.path.join(os.path.dirname(part_config), "raw_id_mappings") predict_dir = config.save_prediction_path emb_dir = config.save_embed_path + task_emb_dirs = [] pred_ntypes = [] pred_etypes = [] if config.multi_tasks is not None: node_predict_dirs = [] edge_predict_dirs = [] - if predict_dir is None: - return node_id_mapping, None, emb_dir, pred_ntypes, pred_etypes # multi-task setting tasks = config.multi_tasks + + for task in tasks: + task_config = task.task_config + task_id = task.task_id + if task.task_type in [BUILTIN_TASK_LINK_PREDICTION]: + if task_config.lp_embed_normalizer is not None: + # There are link prediction node embedding normalizer + # Need to handled the normalized embeddings. + task_emb_dirs.append(task_id) + + if predict_dir is None: + return node_id_mapping, None, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes + for task in tasks: task_config = task.task_config task_id = task.task_id @@ -681,7 +694,7 @@ def _parse_gs_config(config): edge_predict_dirs.append(pred_path) predict_dir = (node_predict_dirs, edge_predict_dirs) - return node_id_mapping, predict_dir, emb_dir, pred_ntypes, pred_etypes + return node_id_mapping, predict_dir, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes else: task_type = config.task_type if task_type in (BUILTIN_TASK_EDGE_CLASSIFICATION, BUILTIN_TASK_EDGE_REGRESSION): @@ -694,7 +707,7 @@ def _parse_gs_config(config): pred_ntypes = pred_ntypes \ if isinstance(pred_ntypes, list) else [pred_ntypes] - return node_id_mapping, predict_dir, emb_dir, pred_ntypes, pred_etypes + return node_id_mapping, predict_dir, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes def main(args, gs_config_args): """ main function @@ -714,7 +727,7 @@ def main(args, gs_config_args): gs_args, _ = gs_parser.parse_known_args(gs_config_args) config = GSConfig(gs_args) config.verify_arguments(False) - id_mapping_path, predict_dir, node_emb_dir, pred_ntypes, pred_etypes = \ + id_mapping_path, predict_dir, node_emb_dir, task_emb_dirs, pred_ntypes, pred_etypes = \ _parse_gs_config(config) else: # Case 2: remap_result is called alone. @@ -724,6 +737,10 @@ def main(args, gs_config_args): id_mapping_path = args.node_id_mapping predict_dir = args.prediction_dir node_emb_dir = args.node_emb_dir + # We do not handle the case when there are task specific embeddings + # in multi-task learning, if remap_result is called alone. + # Users need to clean up the node_emb_dir themselves. + task_emb_dirs = [] pred_etypes = args.pred_etypes pred_ntypes = args.pred_ntypes if pred_etypes is not None: @@ -773,7 +790,26 @@ def main(args, gs_config_args): else: # There is no shared file system emb_names = os.listdir(node_emb_dir) - emb_names = [e_name for e_name in emb_names if e_name != "emb_info.json"] + # In single task learning, the node embed dir looks like: + # emb_dir/ + # ntype0 + # ntype1 + # ... + # emb_info.json + # + # In multi-task learning, the node embed dir looks like: + # emb_dir/ + # ntype0 + # ntype1 + # ... + # emb_info.json + # task_id0/ + # task_id1/ + # ... + # We need to exclude both emb_info.json and task_id directories, + # when we are collecting node types with node embeddings. + emb_names = [e_name for e_name in emb_names \ + if e_name not in task_emb_dirs + ["emb_info.json"]] emb_ntypes = emb_names else: @@ -962,6 +998,21 @@ def main(args, gs_config_args): output_func) files_to_remove += emb_files_to_remove + for task_emb_dir in task_emb_dirs: + task_emb_dir = os.path.join(node_emb_dir, task_emb_dir) + # We need to do ID remapping for node embeddings + emb_files_to_remove = \ + remap_node_emb(emb_ntypes, + task_emb_dir, + task_emb_dir, + out_chunk_size, + num_proc, + rank, + world_size, + with_shared_fs, + output_func) + files_to_remove += emb_files_to_remove + if len(pred_etypes) > 0: if isinstance(predict_dir, tuple): _, edge_predict_dirs = predict_dir diff --git a/python/graphstorm/inference/mt_infer.py b/python/graphstorm/inference/mt_infer.py index 142c5636a4..05943ccb9b 100644 --- a/python/graphstorm/inference/mt_infer.py +++ b/python/graphstorm/inference/mt_infer.py @@ -105,7 +105,8 @@ def infer(self, data, """ do_eval = self.evaluator is not None sys_tracker.check('start inferencing') - self._model.eval() + model = self._model + model.eval() # All the tasks share the same GNN encoder so the fanouts are same # for different tasks. @@ -133,13 +134,13 @@ def gen_embs(edge_mask=None): # so the node embeddings are updated inplace. if use_mini_batch_infer: embs = do_mini_batch_inference( - self._model, data, batch_size=infer_batch_size, + model, data, batch_size=infer_batch_size, fanout=fanout, edge_mask=edge_mask, task_tracker=self.task_tracker) else: embs = do_full_graph_inference( - self._model, data, + model, data, fanout=fanout, edge_mask=edge_mask, task_tracker=self.task_tracker) @@ -154,17 +155,29 @@ def gen_embs(edge_mask=None): # before conducting prediction results. if save_embed_path is not None: logging.info("Saving node embeddings") + node_norm_methods = model.node_embed_norm_methods + # Save the original embs first save_gsgnn_embeddings(g, save_embed_path, embs, node_id_mapping_file=node_id_mapping_file, save_embed_format=save_embed_format) barrier() + for task_id, norm_method in node_norm_methods.items(): + if norm_method is None: + continue + normed_embs = model.normalize_task_node_embs(task_id, embs, inplace=False) + save_embed_path = os.path.join(save_embed_path, task_id) + save_gsgnn_embeddings(g, + save_embed_path, + normed_embs, + node_id_mapping_file=node_id_mapping_file, + save_embed_format=save_embed_format) sys_tracker.check('save embeddings') # save relation embedding if any for link prediction tasks if get_rank() == 0: - decoders = self._model.task_decoders + decoders = model.task_decoders for task_id, decoder in decoders.items(): if isinstance(decoder, LinkPredictDistMultDecoder): rel_emb_path = os.path.join(save_embed_path, task_id) @@ -189,7 +202,7 @@ def gen_embs(edge_mask=None): # and edge regression tasks. pre_results = \ multi_task_mini_batch_predict( - self._model, + model, emb=embs, dataloaders=predict_test_loader.dataloaders, task_infos=predict_test_loader.task_infos, @@ -213,9 +226,9 @@ def nfrecon_gen_embs(skip_last_self_loop=False, node_embs=embs): if skip_last_self_loop is True: # Turn off the last layer GNN's self-loop # to compute node embeddings. - self._model.gnn_encoder.skip_last_selfloop() + model.gnn_encoder.skip_last_selfloop() new_embs = gen_embs() - self._model.gnn_encoder.reset_last_selfloop() + model.gnn_encoder.reset_last_selfloop() return new_embs else: # If skip_last_self_loop is False @@ -231,11 +244,11 @@ def nfrecon_gen_embs(skip_last_self_loop=False, node_embs=embs): # Note(xiangsx): In DistDGl, as we are using the # same dist tensor, the node embeddings # are updated inplace. - nfeat_embs = gen_emb_for_nfeat_reconstruct(self._model, nfrecon_gen_embs) + nfeat_embs = gen_emb_for_nfeat_reconstruct(model, nfrecon_gen_embs) nfeat_recon_results = \ multi_task_mini_batch_predict( - self._model, + model, emb=nfeat_embs, dataloaders=dataloaders, task_infos=task_infos, @@ -258,8 +271,14 @@ def nfrecon_gen_embs(skip_last_self_loop=False, node_embs=embs): # For link prediction, do evaluation task by task. lp_test_embs = gen_embs(edge_mask=task_info.task_config.train_mask) - - decoder = self._model.task_decoders[task_info.task_id] + # normalize the node embedding if needed. + # we can do inplace normalization as embeddings are generated + # per lp task. + lp_test_embs = model.normalize_task_node_embs(task_info.task_id, + lp_test_embs, + inplace=True) + + decoder = model.task_decoders[task_info.task_id] ranking = run_lp_mini_batch_predict(decoder, lp_test_embs, dataloader, device) pre_results[task_info.task_id] = ranking diff --git a/python/graphstorm/model/multitask_gnn.py b/python/graphstorm/model/multitask_gnn.py index 02a679eb70..f5b964e33e 100644 --- a/python/graphstorm/model/multitask_gnn.py +++ b/python/graphstorm/model/multitask_gnn.py @@ -19,6 +19,7 @@ import logging import torch as th from torch import nn +import dgl from ..config import (BUILTIN_TASK_NODE_CLASSIFICATION, BUILTIN_TASK_NODE_REGRESSION, @@ -32,7 +33,14 @@ from .node_gnn import run_node_mini_batch_predict from .edge_gnn import run_edge_mini_batch_predict from .lp_gnn import run_lp_mini_batch_predict - +from .utils import LazyDistTensor +from .utils import normalize_node_embs, get_data_range +from ..utils import ( + get_rank, + get_world_size, + barrier, + create_dist_tensor +) class GSgnnMultiTaskModelInterface: """ The interface for GraphStorm multi-task learning. @@ -93,10 +101,108 @@ def __init__(self, alpha_l2norm): self._task_pool = {} self._decoder = nn.ModuleDict() self._loss_fn = nn.ModuleDict() + self._node_embed_norm_methods = {} self._warn_printed = False + def normalize_task_node_embs(self, task_id, embs, inplace=False): + """ Normalize node embeddings when needed. + + normalize_task_node_embs should be called when embs stores embeddings + of every node. + + Parameters + ---------- + task_id: str + Task ID. + embs: dict of Tensors + A dict of node embeddings. + inplace: bool + Whether to do inplace normalization. + + Returns + ------- + new_embs: dict of Tensors + Normalized node embeddings. + """ + if self._node_embed_norm_methods[task_id] is not None: + new_embs = {} + rank = get_rank() + world_size = get_world_size() + for key, emb in embs.items(): + if isinstance(emb, (dgl.distributed.DistTensor, LazyDistTensor)): + # If emb is a distributed tensor, multiple processes are doing + # embdding normalization concurrently. We need to split + # the task. (From full_graph_inference) + start, end = get_data_range(rank, world_size, len(emb)) + new_emb = emb if inplace else \ + create_dist_tensor(emb.shape, + emb.dtype, + name=f"{emb.name}_task_id", + part_policy=emb.part_policy, + persistent=True) + else: + # If emb is just a torch Tensor. do normalization directly. + # (From mini_batch_inference) + start, end = 0, len(emb) + new_emb = emb if inplace else th.clone(emb) + idx = start + while idx + 1024 < end: + new_emb[idx:idx+1024] = \ + self.minibatch_normalize_task_node_embs( + task_id, + {key:emb[idx:idx+1024]})[key] + idx += 1024 + new_emb[idx:end] = \ + self.minibatch_normalize_task_node_embs( + task_id, + {key:emb[idx:end]})[key] + barrier() + new_embs[key] = new_emb + return new_embs + else: + # If normalization method is None + # do nothing. + new_embs = embs + return new_embs + + # pylint: disable = arguments-differ + def minibatch_normalize_task_node_embs(self, task_id, embs): + """ Normalize node embeddings when needed for a mini-batch. + + minibatch_normalize_task_node_embs should be called in + forward() and predict(). + + Parameters + ---------- + task_id: str + Task ID. + embs: dict of Tensors + A dict of node embeddings. + + Returns + ------- + embs: dict of Tensors + Normalized node embeddings. + """ + if self._node_embed_norm_methods[task_id] is not None: + return normalize_node_embs(embs, self._node_embed_norm_methods[task_id]) + else: + return embs + + @property + def node_embed_norm_methods(self): + """ Get per task node embedding normalization method + + Returns + ------- + dict of strings: + Normalization methods + """ + return self._node_embed_norm_methods + def add_task(self, task_id, task_type, - decoder, loss_func): + decoder, loss_func, + embed_norm_method=None): """ Add a task into the multi-task pool Parameters @@ -112,6 +218,8 @@ def add_task(self, task_id, task_type, Task decoder. loss_func: func Loss function. + embed_norm_method: str + Node embedding normalization method. """ assert task_id not in self._task_pool, \ f"Task {task_id} already exists" @@ -120,6 +228,7 @@ def add_task(self, task_id, task_type, self._decoder[task_id] = decoder # add loss func in nn module self._loss_fn[task_id] = loss_func + self._node_embed_norm_methods[task_id] = embed_norm_method @property def alpha_l2norm(self): @@ -277,7 +386,7 @@ def _forward(self, task_id, encoder_data, decoder_data): encode_embs = self.compute_embed_step(blocks, node_feats, input_nodes) # Call emb normalization. - encode_embs = self.normalize_node_embs(encode_embs) + encode_embs = self.minibatch_normalize_task_node_embs(task_id, encode_embs) if task_type in [BUILTIN_TASK_NODE_CLASSIFICATION, BUILTIN_TASK_NODE_REGRESSION]: labels = decoder_data @@ -353,7 +462,7 @@ def predict(self, task_id, mini_batch, return_proba=False): encode_embs = self.compute_embed_step(blocks, node_feats, input_nodes) # Call emb normalization. - encode_embs = self.normalize_node_embs(encode_embs) + encode_embs = self.minibatch_normalize_task_node_embs(task_id, encode_embs) task_type, _ = self.task_pool[task_id] task_decoder = self.decoder[task_id] @@ -415,6 +524,18 @@ def multi_task_mini_batch_predict( res = {} with th.no_grad(): for dataloader, task_info in zip(dataloaders, task_infos): + # normalize the node embedding if needed. + # input emb is shared across different tasks + # so that we can not do inplace normalization. + # + # Note(xiangsx): Currently node embedding normalization + # only supports link prediction tasks. + # model.normalize_task_node_embs does nothing + # for node and edge prediction tasks. + # TODO(xiangsx): Need a more memory efficient design when + # node embedding normalization supports node and edge + # prediction tasks. + emb = model.normalize_task_node_embs(task_info.task_id, emb, inplace=False) if task_info.task_type in \ [BUILTIN_TASK_NODE_CLASSIFICATION, BUILTIN_TASK_NODE_REGRESSION, diff --git a/python/graphstorm/run/gsgnn_mt/gsgnn_mt.py b/python/graphstorm/run/gsgnn_mt/gsgnn_mt.py index 19d63b77b0..304fabbe16 100644 --- a/python/graphstorm/run/gsgnn_mt/gsgnn_mt.py +++ b/python/graphstorm/run/gsgnn_mt/gsgnn_mt.py @@ -348,7 +348,17 @@ def main(config_args): train_data.g, encoder_out_dims, train_task=True) - model.add_task(task.task_id, task.task_type, decoder, loss_func) + # For link prediction, lp_embed_normalizer may be used + # TODO(xiangsx): add embed normalizer for other task types + # in the future. + node_embed_norm_method = task.task_config.lp_embed_normalizer \ + if task.task_type in [BUILTIN_TASK_LINK_PREDICTION] \ + else None + model.add_task(task.task_id, + task.task_type, + decoder, + loss_func, + embed_norm_method=node_embed_norm_method) if not config.no_validation: if val_loader is None: logging.warning("The training data do not have validation set.") @@ -419,7 +429,14 @@ def main(config_args): train_data.g, encoder_out_dims, train_task=True) - model.add_task(task.task_id, task.task_type, decoder, loss_func) + node_embed_norm_method = task.task_config.lp_embed_normalizer \ + if task.task_type in [BUILTIN_TASK_LINK_PREDICTION] \ + else None + model.add_task(task.task_id, + task.task_type, + decoder, + loss_func, + embed_norm_method=node_embed_norm_method) best_model_path = trainer.get_best_model_path() # TODO(zhengda) the model path has to be in a shared filesystem. model.restore_model(best_model_path) @@ -432,6 +449,7 @@ def main(config_args): embeddings = do_full_graph_inference(model, train_data, fanout=config.eval_fanout, task_tracker=tracker) + # Save the original embs first save_full_node_embeddings( train_data.g, config.save_embed_path, @@ -439,6 +457,21 @@ def main(config_args): node_id_mapping_file=config.node_id_mapping_file, save_embed_format=config.save_embed_format) + node_norm_methods = model.node_embed_norm_methods + # save normalized embeddings + for task_id, norm_method in node_norm_methods.items(): + if norm_method is None: + continue + normed_embs = model.normalize_task_node_embs(task_id, embeddings, inplace=False) + save_embed_path = os.path.join(config.save_embed_path, task_id) + save_full_node_embeddings( + train_data.g, + save_embed_path, + normed_embs, + node_id_mapping_file=config.node_id_mapping_file, + save_embed_format=config.save_embed_format) + + def generate_parser(): """ Generate an argument parser """ diff --git a/python/graphstorm/run/gsgnn_mt/mt_infer_gnn.py b/python/graphstorm/run/gsgnn_mt/mt_infer_gnn.py index 718625f3f1..6c4122004b 100644 --- a/python/graphstorm/run/gsgnn_mt/mt_infer_gnn.py +++ b/python/graphstorm/run/gsgnn_mt/mt_infer_gnn.py @@ -218,7 +218,17 @@ def main(config_args): predict_dataloaders.append(data_loader) predict_tasks.append(task) - model.add_task(task.task_id, task.task_type, decoder, loss_func) + # For link prediction, lp_embed_normalizer may be used + # TODO(xiangsx): add embed normalizer for other task types + # in the future. + node_embed_norm_method = task.task_config.lp_embed_normalizer \ + if task.task_type in [BUILTIN_TASK_LINK_PREDICTION] \ + else None + model.add_task(task.task_id, + task.task_type, + decoder, + loss_func, + embed_norm_method=node_embed_norm_method) # Multi-task testing dataloader for node prediction and # edge prediction tasks. diff --git a/python/graphstorm/trainer/lp_trainer.py b/python/graphstorm/trainer/lp_trainer.py index 5308a75204..182e5feecd 100644 --- a/python/graphstorm/trainer/lp_trainer.py +++ b/python/graphstorm/trainer/lp_trainer.py @@ -182,11 +182,11 @@ def fit(self, train_loader, num_epochs, input_nodes = {pos_graph.ntypes[0]: input_nodes} nfeat_fields = train_loader.node_feat_fields input_feats = data.get_node_feats(input_nodes, nfeat_fields, device) - if train_loader.pos_graph_feat_fields is not None: + if train_loader.pos_graph_edge_feat_fields is not None: input_edges = {etype: pos_graph.edges[etype].data[dgl.EID] \ for etype in pos_graph.canonical_etypes} pos_graph_feats = data.get_edge_feats(input_edges, - train_loader.pos_graph_feat_fields, + train_loader.pos_graph_edge_feat_fields, device) else: pos_graph_feats = None diff --git a/python/graphstorm/trainer/mt_trainer.py b/python/graphstorm/trainer/mt_trainer.py index 630e70235d..2f2787baaa 100644 --- a/python/graphstorm/trainer/mt_trainer.py +++ b/python/graphstorm/trainer/mt_trainer.py @@ -180,11 +180,11 @@ def prepare_link_predict_mini_batch(data, task_info, mini_batch, device): nfeat_fields = task_info.dataloader.node_feat_fields node_feats = data.get_node_feats(input_nodes, nfeat_fields, device) - if task_info.dataloader.pos_graph_feat_fields is not None: + if task_info.dataloader.pos_graph_edge_feat_fields is not None: input_edges = {etype: pos_graph.edges[etype].data[dgl.EID] \ for etype in pos_graph.canonical_etypes} pos_graph_feats = data.get_edge_feats(input_edges, - task_info.dataloader.pos_graph_feat_fields, + task_info.dataloader.pos_graph_edge_feat_fields, device) else: pos_graph_feats = None @@ -641,6 +641,12 @@ def gen_embs(edge_mask=None): # For link prediction, do evaluation task # by task. lp_test_embs = gen_embs(edge_mask=task_info.task_config.train_mask) + # normalize the node embedding if needed. + # we can do inplace normalization as embeddings are generated + # per lp task. + lp_test_embs = model.normalize_task_node_embs(task_info.task_id, + lp_test_embs, + inplace=True) decoder = model.task_decoders[task_info.task_id] val_scores = run_lp_mini_batch_predict(decoder, diff --git a/tests/end2end-tests/graphstorm-mt/mgpu_test.sh b/tests/end2end-tests/graphstorm-mt/mgpu_test.sh index aceb326ac6..67eb3211c8 100644 --- a/tests/end2end-tests/graphstorm-mt/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-mt/mgpu_test.sh @@ -674,6 +674,49 @@ python3 $GS_HOME/tests/end2end-tests/check_infer.py --train-embout /data/gsgnn_m error_and_exit $? -rm -fr /data/gsgnn_mt/infer-emb/ -rm -fr /data/gsgnn_mt/prediction/ +rm -fr /data/gsgnn_mt/ rm -fr /tmp/infer_log.txt + + +echo "**************[Multi-task] dataset: Movielens, RGCN layer 1, node feat: fixed HF BERT, BERT nodes: movie, inference: full-graph, save model" +python3 -m graphstorm.run.gs_multi_task_learning --workspace $GS_HOME/training_scripts/gsgnn_mt --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_multi_task_train_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_lp_norm.yaml --save-model-path /data/gsgnn_mt/ --save-model-frequency 1000 --logging-file /tmp/train_log.txt --logging-level debug --preserve-input True --use-mini-batch-infer False --save-embed-path /data/gsgnn_mt/emb/ + +error_and_exit $? + +cnt=$(grep "save_embed_path: /data/gsgnn_mt/emb/" /tmp/train_log.txt | wc -l) +if test $cnt -lt 1 +then + echo "We use SageMaker task tracker, we should have save_embed_path" + exit -1 +fi + +cnt=$(ls -l /data/gsgnn_mt/emb/ | wc -l) +cnt=$[cnt - 1] +if test $cnt != 3 +then + echo "The number of saved embs $cnt is not equal to 3. Should have two for movie and user and One for link-prediction-subtask normalized embedding." +fi + +echo "**************[Multi-task] dataset: Movielens, RGCN layer 1, node feat: fixed HF BERT, BERT nodes: movie, inference with test" +python3 -m graphstorm.run.gs_multi_task_learning --inference --workspace $GS_HOME/inference_scripts/mt_infer --num-trainers $NUM_INFERs --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_multi_task_train_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_lp_norm_with_mask_infer.yaml --use-mini-batch-infer false --save-embed-path /data/gsgnn_mt/infer-emb/ --restore-model-path /data/gsgnn_mt/epoch-2 --save-prediction-path /data/gsgnn_mt/prediction/ --logging-file /tmp/infer_log.txt --preserve-input True + +error_and_exit $? + +cnt=$(ls -l /data/gsgnn_mt/infer-emb/ | wc -l) +cnt=$[cnt - 2] +if test $cnt != 4 +then + echo "The number of saved embs $cnt is not equal to 3. Should have two for movie and user and One for link-prediction-subtask normalized embedding." +fi + +python3 $GS_HOME/tests/end2end-tests/check_infer.py --train-embout /data/gsgnn_mt/emb/ --infer-embout /data/gsgnn_mt/infer-emb/ + +error_and_exit $? + +python3 $GS_HOME/tests/end2end-tests/check_infer.py --train-embout /data/gsgnn_mt/emb/link_prediction-user_rating_movie --infer-embout /data/gsgnn_mt/infer-emb/link_prediction-user_rating_movie + +error_and_exit $? + +rm -fr /data/gsgnn_mt/ +rm -fr /tmp/train_log.txt +rm -fr /tmp/infer_log.txt \ No newline at end of file diff --git a/tests/unit-tests/gconstruct/test_remap_result.py b/tests/unit-tests/gconstruct/test_remap_result.py index 7ac1c7156e..ca48a81586 100644 --- a/tests/unit-tests/gconstruct/test_remap_result.py +++ b/tests/unit-tests/gconstruct/test_remap_result.py @@ -26,6 +26,7 @@ from numpy.testing import assert_equal, assert_almost_equal from graphstorm.config import GSConfig +from graphstorm.config.config import get_mttask_id from graphstorm.config import (BUILTIN_TASK_NODE_CLASSIFICATION, BUILTIN_TASK_NODE_REGRESSION, BUILTIN_TASK_EDGE_CLASSIFICATION, @@ -409,13 +410,14 @@ def test_parse_config(): setattr(config, "_task_type", BUILTIN_TASK_NODE_CLASSIFICATION) setattr(config, "_target_ntype", target_ntype) setattr(config, "_multi_tasks", None) - node_id_mapping, predict_dir, emb_dir, pred_ntypes, pred_etypes = _parse_gs_config(config) + node_id_mapping, predict_dir, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes = _parse_gs_config(config) assert node_id_mapping == os.path.join(tmpdirname, "raw_id_mappings") assert predict_dir == save_prediction_path assert emb_dir == save_embed_path assert len(pred_ntypes) == 1 assert pred_ntypes[0] == target_ntype assert len(pred_etypes) == 0 + assert len(task_emb_dirs) == 0 target_etype = ["n0,r0,n1"] config = GSConfig.__new__(GSConfig) @@ -426,13 +428,14 @@ def test_parse_config(): setattr(config, "_target_etype", target_etype) setattr(config, "_multi_tasks", None) - node_id_mapping, predict_dir, emb_dir, pred_ntypes, pred_etypes = _parse_gs_config(config) + node_id_mapping, predict_dir, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes = _parse_gs_config(config) assert node_id_mapping == os.path.join(tmpdirname, "raw_id_mappings") assert predict_dir == save_prediction_path assert emb_dir == save_embed_path assert len(pred_ntypes) == 0 assert len(pred_etypes) == 1 assert pred_etypes[0] == ["n0", "r0", "n1"] + assert len(task_emb_dirs) == 0 # multi-task config multi_task_config = [ @@ -470,9 +473,10 @@ def test_parse_config(): "link_prediction" : { "num_negative_edges": 4, "batch_size": 128, - "exclude_training_targets": False + "exclude_training_targets": False, + "lp_embed_normalizer": "l2_norm" } - } + }, ] config = GSConfig.__new__(GSConfig) @@ -480,7 +484,7 @@ def test_parse_config(): setattr(config, "_save_prediction_path", save_prediction_path) setattr(config, "_save_embed_path", save_embed_path) config._parse_multi_tasks(multi_task_config) - node_id_mapping, predict_dir, emb_dir, pred_ntypes, pred_etypes = _parse_gs_config(config) + node_id_mapping, predict_dir, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes = _parse_gs_config(config) assert node_id_mapping == os.path.join(tmpdirname, "raw_id_mappings") assert isinstance(predict_dir, tuple) @@ -498,14 +502,20 @@ def test_parse_config(): assert len(pred_etypes) == 2 assert pred_etypes[0] == ['n0', 'r0', 'r1'] assert pred_etypes[1] == ['n0', 'r0', 'r2'] + print(task_emb_dirs) + assert len(task_emb_dirs) == 1 + assert task_emb_dirs[0] == get_mttask_id( + task_type="link_prediction", + etype="ALL_ETYPE") # there is no predict path # it will use emb_path + multi_task_config[4]["link_prediction"].pop("lp_embed_normalizer") config = GSConfig.__new__(GSConfig) setattr(config, "_part_config", part_path) setattr(config, "_save_embed_path", save_embed_path) config._parse_multi_tasks(multi_task_config) - node_id_mapping, predict_dir, emb_dir, pred_ntypes, pred_etypes = _parse_gs_config(config) + node_id_mapping, predict_dir, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes = _parse_gs_config(config) assert node_id_mapping == os.path.join(tmpdirname, "raw_id_mappings") assert isinstance(predict_dir, tuple) node_predict_dirs, edge_predict_dirs = predict_dir @@ -515,12 +525,13 @@ def test_parse_config(): assert node_predict_dirs[1] == os.path.join(save_embed_path, config.multi_tasks[1].task_id) assert edge_predict_dirs[0] == os.path.join(save_embed_path, config.multi_tasks[2].task_id) assert edge_predict_dirs[1] == os.path.join(save_embed_path, config.multi_tasks[3].task_id) + assert len(task_emb_dirs) == 0 # there is no predict path and emb path config = GSConfig.__new__(GSConfig) setattr(config, "_part_config", part_path) config._parse_multi_tasks(multi_task_config) - node_id_mapping, predict_dir, emb_dir, pred_ntypes, pred_etypes = _parse_gs_config(config) + node_id_mapping, predict_dir, emb_dir, task_emb_dirs, pred_ntypes, pred_etypes = _parse_gs_config(config) assert predict_dir is None assert emb_dir is None diff --git a/tests/unit-tests/test_gnn.py b/tests/unit-tests/test_gnn.py index eaeda9f48a..4141651ce4 100644 --- a/tests/unit-tests/test_gnn.py +++ b/tests/unit-tests/test_gnn.py @@ -31,6 +31,7 @@ from numpy.testing import assert_almost_equal, assert_equal import dgl +from dgl.distributed import DistTensor from graphstorm.config import GSConfig, TaskInfo from graphstorm.config import BUILTIN_LP_DOT_DECODER @@ -39,7 +40,8 @@ BUILTIN_TASK_EDGE_CLASSIFICATION, BUILTIN_TASK_EDGE_REGRESSION, BUILTIN_TASK_LINK_PREDICTION, - BUILTIN_TASK_RECONSTRUCT_NODE_FEAT) + BUILTIN_TASK_RECONSTRUCT_NODE_FEAT, + GRAPHSTORM_LP_EMB_L2_NORMALIZATION) from graphstorm.model import GSNodeEncoderInputLayer, RelationalGCNEncoder from graphstorm.model import GSgnnNodeModel, GSgnnEdgeModel from graphstorm.model import GSLMNodeEncoderInputLayer, GSPureLMNodeInputLayer @@ -1815,6 +1817,121 @@ class DummyLPPredLoss(nn.Module): def forward(self, pos_score, neg_score): return pos_score["n0"] + neg_score["n0"] +def test_multi_task_norm_node_embs(): + mt_model = GSgnnMultiTaskSharedEncoderModel(0.1) + mt_model.add_task("nc_task", + BUILTIN_TASK_NODE_CLASSIFICATION, + DummyNCDecoder(), + DummyPredLoss(), + "") + mt_model.add_task("nr_task", + BUILTIN_TASK_NODE_REGRESSION, + DummyNRDecoder(), + DummyPredLoss(), + GRAPHSTORM_LP_EMB_L2_NORMALIZATION) + + mt_model.add_task("ec_task", + BUILTIN_TASK_EDGE_CLASSIFICATION, + DummyECDecoder(), + DummyPredLoss(), + "") + + mt_model.add_task("er_task", + BUILTIN_TASK_EDGE_REGRESSION, + DummyERDecoder(), + DummyPredLoss(), + GRAPHSTORM_LP_EMB_L2_NORMALIZATION) + + mt_model.add_task("lp_task", + BUILTIN_TASK_LINK_PREDICTION, + DummyLPDecoder(), + DummyLPPredLoss(), + "") + + mt_model.add_task("lp_task2", + BUILTIN_TASK_LINK_PREDICTION, + DummyLPDecoder(), + DummyLPPredLoss(), + GRAPHSTORM_LP_EMB_L2_NORMALIZATION) + + embs = { + "n0": th.rand((10,16)), + "n1": th.rand((20,16)) + } + norm_embs = { + "n0": F.normalize(embs["n0"]), + "n1": F.normalize(embs["n1"]) + } + + new_embs = mt_model.normalize_task_node_embs("nc_task", embs, inplace=False) + assert_equal(embs["n0"].numpy(), new_embs["n0"].numpy()) + assert_equal(embs["n1"].numpy(), new_embs["n1"].numpy()) + + new_embs = mt_model.normalize_task_node_embs("nr_task", embs, inplace=False) + assert_equal(norm_embs["n0"].numpy(), new_embs["n0"].numpy()) + assert_equal(norm_embs["n1"].numpy(), new_embs["n1"].numpy()) + + new_embs = mt_model.normalize_task_node_embs("ec_task", embs, inplace=False) + assert_equal(embs["n0"].numpy(), new_embs["n0"].numpy()) + assert_equal(embs["n1"].numpy(), new_embs["n1"].numpy()) + + new_embs = mt_model.normalize_task_node_embs("er_task", embs, inplace=False) + assert_equal(norm_embs["n0"].numpy(), new_embs["n0"].numpy()) + assert_equal(norm_embs["n1"].numpy(), new_embs["n1"].numpy()) + + inplace_emb = { + "n0": th.clone(embs["n0"]), + "n1": th.clone(embs["n1"]) + } + mt_model.normalize_task_node_embs("lp_task", inplace_emb, inplace=True) + assert_equal(embs["n0"].numpy(), inplace_emb["n0"].numpy()) + assert_equal(embs["n1"].numpy(), inplace_emb["n1"].numpy()) + + mt_model.normalize_task_node_embs("lp_task2", inplace_emb, inplace=True) + assert_equal(norm_embs["n0"].numpy(), inplace_emb["n0"].numpy()) + assert_equal(norm_embs["n1"].numpy(), inplace_emb["n1"].numpy()) + +def test_multi_task_norm_node_embs_dist(): + mt_model = GSgnnMultiTaskSharedEncoderModel(0.1) + mt_model.add_task("lp_task", + BUILTIN_TASK_LINK_PREDICTION, + DummyLPDecoder(), + DummyLPPredLoss(), + "") + + mt_model.add_task("lp_task2", + BUILTIN_TASK_LINK_PREDICTION, + DummyLPDecoder(), + DummyLPPredLoss(), + GRAPHSTORM_LP_EMB_L2_NORMALIZATION) + + with tempfile.TemporaryDirectory() as tmpdirname: + # get the test dummy distributed graph + g, _ = generate_dummy_dist_graph(tmpdirname, size="tiny") + + embs = {} + norm_embs = {} + dist_embs = {} + + for ntype in g.ntypes: + embs[ntype] = th.rand(g.number_of_nodes(ntype), 16) + norm_embs[ntype] = F.normalize(embs[ntype]) + dist_embs[ntype] = DistTensor((g.number_of_nodes(ntype), 16), + dtype=th.float32, name=f'ntype-{ntype}', + part_policy=g.get_node_partition_policy(ntype)) + dist_embs[ntype][th.arange(g.number_of_nodes(ntype))] = embs[ntype][:] + + new_embs = mt_model.normalize_task_node_embs("lp_task", dist_embs, inplace=False) + for ntype in g.ntypes: + assert_equal(embs[ntype].numpy(), new_embs[ntype][th.arange(g.number_of_nodes(ntype))].numpy()) + + new_embs = mt_model.normalize_task_node_embs("lp_task2", dist_embs, inplace=False) + for ntype in g.ntypes: + assert_equal(norm_embs[ntype].numpy(), new_embs[ntype][th.arange(g.number_of_nodes(ntype))].numpy()) + + dgl.distributed.kvstore.close_kvstore() + + def test_multi_task_forward(): mt_model = GSgnnMultiTaskSharedEncoderModel(0.1) @@ -1850,7 +1967,7 @@ def check_forward(mock_normalize_node_embs, mock_compute_emb, mock_input_embed): - def normalize_size_effect_func(embs): + def normalize_size_effect_func(task_id, embs): return embs def compute_side_effect_func(blocks, node_feats, input_nodes): @@ -1981,7 +2098,7 @@ def check_forward(mock_normalize_node_embs, mock_compute_emb, mock_input_embed): - def normalize_size_effect_func(embs): + def normalize_size_effect_func(task_id, embs): return embs def compute_side_effect_func(blocks, node_feats, input_nodes): @@ -2315,6 +2432,8 @@ def check_call_gen_embs(skip_last_self_loop): if __name__ == '__main__': test_node_feat_reconstruct() + test_multi_task_norm_node_embs() + test_multi_task_norm_node_embs_dist() test_multi_task_forward() test_multi_task_predict() test_multi_task_mini_batch_predict() diff --git a/tests/unit-tests/util.py b/tests/unit-tests/util.py index 963f68f23d..39d3672dee 100644 --- a/tests/unit-tests/util.py +++ b/tests/unit-tests/util.py @@ -108,6 +108,12 @@ def __init__(self, encoder_model, decoders, has_sparse=False): super(DummyGSgnnMTModel, self).__init__(encoder_model, has_sparse) self._decoders = decoders + @property + def node_embed_norm_methods(self): + return {} + + def normalize_task_node_embs(self, task_id, embs, inplace=False): + return embs def forward(self, task_mini_batches): pass diff --git a/training_scripts/gsgnn_mt/ml_nc_lp_norm.yaml b/training_scripts/gsgnn_mt/ml_nc_lp_norm.yaml new file mode 100644 index 0000000000..261a1c6106 --- /dev/null +++ b/training_scripts/gsgnn_mt/ml_nc_lp_norm.yaml @@ -0,0 +1,66 @@ +--- +version: 1.0 +gsf: + basic: + backend: gloo + verbose: false + save_perf_results_path: null + batch_size: 32 + node_feat_name: + - user:feat + - movie:title + gnn: + model_encoder_type: rgcn + fanout: "4" + num_layers: 1 + hidden_size: 32 + use_mini_batch_infer: true + input: + restore_model_path: null + output: + save_model_path: null + save_embed_path: null + hyperparam: + dropout: 0. + lr: 0.001 + lm_tune_lr: 0.0001 + num_epochs: 3 + wd_l2norm: 0 + no_validation: false + rgcn: + num_bases: -1 + use_self_loop: true + sparse_optimizer_lr: 1e-2 + use_node_embeddings: false + multi_task_learning: + - node_classification: + target_ntype: "movie" + label_field: "label" + multilabel: false + num_classes: 19 + batch_size: 16 # will overwrite the global batch_size + mask_fields: + - "train_mask_c0" # node classification mask 0 + - "val_mask_c0" + - "test_mask_c0" + task_weight: 1.0 + eval_metric: + - "accuracy" + - link_prediction: + lp_loss_func: "contrastive" + num_negative_edges: 4 + num_negative_edges_eval: 100 + train_negative_sampler: joint + eval_etype: + - "user,rating,movie" + train_etype: + - "user,rating,movie" + exclude_training_targets: true + reverse_edge_types_map: + - user,rating,rating-rev,movie + batch_size: 128 # will overwrite the global batch_size + mask_fields: + - "train_mask_field_lp" + - "val_mask_field_lp" + - null # empty means there is no test mask + task_weight: 1.0 diff --git a/tutorial/distributed_construction.png b/tutorial/distributed_construction.png new file mode 100644 index 0000000000..357bae2f94 Binary files /dev/null and b/tutorial/distributed_construction.png differ