diff --git a/docker/build_docker_parmetis.sh b/docker/build_docker_parmetis.sh new file mode 100644 index 0000000000..a53c55555f --- /dev/null +++ b/docker/build_docker_parmetis.sh @@ -0,0 +1,50 @@ +#!/bin/bash +set -eox pipefail + +# process argument 1: graphstorm home folder +if [ -z "$1" ]; then + echo "Please provide a path to the root directory of the GraphStorm repository." + echo "For example, ./build_docker_parmetis.sh ../ graphstorm parmetis cpu" + exit 1 +else + GSF_HOME="$1" +fi + +# process argument 2: docker image name, default is graphstorm +if [ -z "$2" ]; then + IMAGE_NAME="graphstorm" +else + IMAGE_NAME="$2" +fi + +# process argument 3: image's tag name, default is 'parmetis-cpu' +if [ -z "$3" ]; then + TAG="parmetis-cpu" +else + TAG="$3" +fi + +# Copy scripts and tools codes to the docker folder +mkdir -p $GSF_HOME"/docker/code" +cp -r $GSF_HOME"/python" $GSF_HOME"/docker/code/python" +cp -r $GSF_HOME"/examples" $GSF_HOME"/docker/code/examples" +cp -r $GSF_HOME"/inference_scripts" $GSF_HOME"/docker/code/inference_scripts" +cp -r $GSF_HOME"/tools" $GSF_HOME"/docker/code/tools" +cp -r $GSF_HOME"/training_scripts" $GSF_HOME"/docker/code/training_scripts" + +aws ecr-public get-login-password --region us-east-1 | \ + docker login --username AWS --password-stdin public.ecr.aws + +# Build OSS docker for EC2 instances that an pull ECR docker images +DOCKER_FULLNAME="${IMAGE_NAME}:${TAG}" + +echo "Build a local docker image ${DOCKER_FULLNAME} for ParMETIS" + +SOURCE_IMAGE="public.ecr.aws/ubuntu/ubuntu:20.04_stable" + +DOCKER_BUILDKIT=1 docker build \ + --build-arg SOURCE=${SOURCE_IMAGE} \ + -f "${GSF_HOME}/docker/parmetis/Dockerfile.parmetis" . -t $DOCKER_FULLNAME + +# remove the temporary code folder +rm -rf $GSF_HOME"/docker/code" diff --git a/docker/parmetis/Dockerfile.parmetis b/docker/parmetis/Dockerfile.parmetis new file mode 100644 index 0000000000..9b16c49217 --- /dev/null +++ b/docker/parmetis/Dockerfile.parmetis @@ -0,0 +1,111 @@ +ARG SOURCE + +FROM ${SOURCE} as base + +ENV DEBIAN_FRONTEND=noninteractive +ENV HOME=/root + +ARG DGL_VERSION=1.1.3 +ARG OGB_VERSION=1.3.6 +ARG TORCH_VERSION=2.1.2 +ARG TRANSFORMERS_VERSION=4.28.1 + +RUN apt update && apt install -y --no-install-recommends \ + git \ + libicu-dev \ + openssh-client \ + openssh-server \ + python3.9 \ + python3.9-distutils \ + python3.9-venv \ + gfortran \ + cmake \ + build-essential \ + g++ \ + vim \ + wget \ + && rm -rf /var/lib/apt/lists/* +# Create and activate a Python venv +RUN python3.9 -m venv /opt/gs-venv +ENV PATH="/opt/gs-venv/bin:$PATH" + +# Install GraphStorm dependencies +RUN pip install \ + boto3==1.34.89 \ + botocore==1.34.89 \ + h5py==3.11.0 \ + networkx==3.1 \ + psutil==5.9.8 \ + pyarrow==14.0.0 \ + pydantic==2.7.0 \ + scikit-learn==1.4.2 \ + scipy==1.13.0 \ + pyyaml \ + && rm -rf /root/.cache + +# Install torch, DGL, and GSF deps that require torch +RUN pip install \ + torch==${TORCH_VERSION} \ + --index-url https://download.pytorch.org/whl/cpu \ + && rm -rf /root/.cache + +RUN pip install \ + dgl==${DGL_VERSION} \ + ogb==${OGB_VERSION} \ + transformers==${TRANSFORMERS_VERSION} \ + -f https://data.dgl.ai/wheels-internal/repo.html && rm -rf /root/.cache + +FROM base as runtime + +ENV PYTHONPATH="/root/dgl/tools/:${PYTHONPATH}" + +# Download DGL source code +RUN cd /root; git clone --single-branch --branch 2.2.x https://github.com/dmlc/dgl.git + +# Copy GraphStorm source and add to PYTHONPATH +RUN mkdir -p /graphstorm +COPY code/python/graphstorm /graphstorm/python/graphstorm +ENV PYTHONPATH="/graphstorm/python/:${PYTHONPATH}" + +# Copy GraphStorm scripts and tools +COPY code/examples /graphstorm/examples +COPY code/inference_scripts /graphstorm/inference_scripts +COPY code/tools /graphstorm/tools +COPY code/training_scripts /graphstorm/training_scripts + +# Install GKLib +RUN cd /root; git clone --single-branch --branch master https://github.com/KarypisLab/GKlib; cd GKlib; make; make install + +# Install Metis +RUN cd /root; git clone --single-branch --branch master https://github.com/KarypisLab/METIS.git; cd METIS; \ + make config shared=1 cc=gcc prefix=/root/local i64=1; make install + +# Install MPI +RUN wget https://download.open-mpi.org/release/open-mpi/v4.1/openmpi-4.1.1.tar.gz && \ + tar -xzvf openmpi-4.1.1.tar.gz && \ + cd openmpi-4.1.1 && \ + ./configure --prefix=/usr/local && \ + make all && \ + make install && \ + ldconfig && rm -rf mpich-4.1.1.tar.gz + +# Install Parmetis +RUN cd /root; git clone --single-branch --branch main https://github.com/KarypisLab/PM4GNN.git; cd PM4GNN; \ + make config cc=mpicc prefix=/root/local; make install +ENV PATH=$PATH:/root/local/bin +ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/root/local/lib/ + +# Set up SSH access +ENV SSH_PORT=2222 + +RUN cat /etc/ssh/sshd_config > /tmp/sshd_config && \ + sed "0,/^#Port 22/s//Port ${SSH_PORT}/" /tmp/sshd_config > /etc/ssh/sshd_config +ENV SSHDIR $HOME/.ssh +RUN mkdir -p ${SSHDIR} \ + && ssh-keygen -t rsa -f ${SSHDIR}/id_rsa -N '' \ + && cp ${SSHDIR}/id_rsa.pub ${SSHDIR}/authorized_keys \ + && mkdir /run/sshd + +EXPOSE ${SSH_PORT} + +CMD ["/usr/sbin/sshd", "-D"] diff --git a/python/graphstorm/gpartition/__init__.py b/python/graphstorm/gpartition/__init__.py index b2a313b12d..2006e0fdeb 100644 --- a/python/graphstorm/gpartition/__init__.py +++ b/python/graphstorm/gpartition/__init__.py @@ -16,3 +16,5 @@ Modules for local graph partitioning. """ from .random_partition import (RandomPartitionAlgorithm) +from .metis_partition import (ParMetisPartitionAlgorithm) +from .partition_config import (ParMETISConfig) diff --git a/python/graphstorm/gpartition/dist_partition_graph.py b/python/graphstorm/gpartition/dist_partition_graph.py index fdde707342..399390a04e 100644 --- a/python/graphstorm/gpartition/dist_partition_graph.py +++ b/python/graphstorm/gpartition/dist_partition_graph.py @@ -29,7 +29,8 @@ from typing import Dict from threading import Thread -from graphstorm.gpartition import RandomPartitionAlgorithm +from graphstorm.gpartition import (ParMetisPartitionAlgorithm, ParMETISConfig, + RandomPartitionAlgorithm) from graphstorm.utils import get_log_level @@ -117,6 +118,10 @@ def main(): part_start = time.time() if args.partition_algorithm == "random": partitioner = RandomPartitionAlgorithm(metadata_dict) + elif args.partition_algorithm == "parmetis": + partition_config = ParMETISConfig(args.ip_list, args.input_path, + args.dgl_tool_path, args.metadata_filename) + partitioner = ParMetisPartitionAlgorithm(metadata_dict, partition_config) else: raise RuntimeError(f"Unknown partition algorithm {args.part_algorithm}") @@ -161,7 +166,7 @@ def parse_args() -> argparse.Namespace: argparser.add_argument("--dgl-tool-path", type=str, help="The path to dgl/tools") argparser.add_argument("--partition-algorithm", type=str, default="random", - choices=["random"], help="Partition algorithm to use.") + choices=["random", "parmetis"], help="Partition algorithm to use.") argparser.add_argument("--ip-list", type=str, help="A file storing the ip list of instances of the partition cluster.") argparser.add_argument("--do-dispatch", action='store_true') diff --git a/python/graphstorm/gpartition/metis_partition.py b/python/graphstorm/gpartition/metis_partition.py new file mode 100644 index 0000000000..cb34449f1d --- /dev/null +++ b/python/graphstorm/gpartition/metis_partition.py @@ -0,0 +1,204 @@ +""" + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Parmetis partition assignment +""" +import os +import logging +import json +import subprocess +import sys + +from .partition_algo_base import LocalPartitionAlgorithm + + +class ParMetisPartitionAlgorithm(LocalPartitionAlgorithm): + """ + Multiple-instances metis partitioning algorithm. + + The partition algorithm accepts the intermediate output from GraphStorm + gs-processing which matches the requirements of the DGL distributed + partitioning pipeline. + + + Parameters + ---------- + metadata_dict: dict + DGL "Chunked graph data" JSON, as defined in + https://docs.dgl.ai/guide/distributed-preprocessing.html#specification + metis_config: ParMETISConfig + Configuration object for ParMETIS. + """ + + def __init__(self, metadata_dict, metis_config): + super().__init__(metadata_dict) + self.metis_config = metis_config + + def _launch_preprocess(self, num_parts, input_path, ip_list, dgl_tool_path, metadata_filename): + """ Launch preprocessing script + + Parameters + ---------- + num_parts: int + Number of graph partitions + input_path: str + Path to the input graph data + ip_list: str + ip list file after port assigned + dgl_tool_path: str + Path to the dgl tool added in the PYTHONPATH + metadata_filename: str + Meta data configuration name + """ + command = f"mpirun -np {num_parts} --allow-run-as-root --hostfile {ip_list} \ + -wdir {input_path} \ + {sys.executable} {dgl_tool_path}/distpartitioning/parmetis_preprocess.py \ + --input_dir {input_path} \ + --schema_file {metadata_filename} \ + --output_dir {input_path} --num_parts {num_parts}" + + if self.run_command(command, "preprocess"): + logging.info("Successfully execute parmetis preprocess.") + return True + else: + logging.info("Failed to execute parmetis preprocess.") + return False + + + def _launch_parmetis(self, num_parts, input_path, ip_list, graph_name): + """ Launch parmetis script + + Parameters + ---------- + num_parts: int + Number of graph partitions + input_path: str + Path to the input graph data + ip_list: str + ip list + graph_name: str + Graph name + """ + assert os.path.exists(os.path.expanduser("~/local/bin/pm_dglpart")), \ + "pm_dglpart not found in ~/local/bin/" + command = f"mpirun -np 1 --allow-run-as-root \ + --hostfile {ip_list} \ + --mca orte_base_help_aggregate 0 -mca btl_tcp_if_include eth0 \ + -wdir {input_path} \ + ~/local/bin/pm_dglpart {graph_name} {num_parts} {input_path}/parmetis_nfiles.txt \ + {input_path}/parmetis_efiles.txt" + + if self.run_command(command, "parmetis"): + logging.info("Successfully execute parmetis process.") + return True + else: + logging.info("Failed to execute parmetis process.") + return False + + def _launch_postprocess(self, num_parts, input_data_path, dgl_tool_path, + metadata_filename, graph_name, partition_dir): + """ Launch postprocess which translates nid-partid mapping into + Per-node-type partid mappings. + + Parameters + ---------- + num_parts: int + Number of graph partitions + input_data_path: str + Path to the input graph data + dgl_tool_path: str + Path to the dgl tool added in the PYTHONPATH + metadata_filename: str + Meta data configuration name + graph_name: str + name of the graph in the parmetis step + partition_dir: str + output path + """ + command = f"{sys.executable} {dgl_tool_path}/distpartitioning/parmetis_postprocess.py \ + --postproc_input_dir {input_data_path} \ + --schema_file {metadata_filename} \ + --parmetis_output_file {input_data_path}/{graph_name}_part.{num_parts} \ + --partitions_dir {partition_dir}" + + if self.run_command(command, "postprocess"): + logging.info("Successfully execute post parmetis process.") + return True + else: + logging.info("Failed to execute post parmetis process.") + return False + + def run_command(self, command, stream): + """Function to execute a command and check for its success.""" + logging.info("Executing command: %s", command) + try: + # Execute the command and check if it completes successfully + result = subprocess.run(command, shell=True, check=True, text=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if stream in ("preprocess", "postprocess"): + logging.info("Command output: %s", result.stderr) + else: + logging.info("Command output: %s", result.stdout) + return True # Return True if the command was successful + except subprocess.CalledProcessError as e: + logging.info("Error executing command: %s", e.stderr) + return False # Return False if the command failed + + def assigned_port(self, ip_file, port="2222"): + """Function to assigned port to each ip prepared for mpi.""" + if not os.path.isfile(ip_file): + raise ValueError("ip file does not exist") + # MPI run will need to explicitly assign port=2222 in the ip list file + # when running in the docker environment + with open(ip_file, 'r', encoding='utf-8') as file: + # Read all lines from the input file + ip_addresses = file.readlines() + + base, ext = os.path.splitext(ip_file) + output_file = f"{base}_parmetis{ext if ext else ''}" + with open(output_file, 'w', encoding='utf-8') as file: + # Write each IP address with the appended port information + for ip in ip_addresses: + ip = ip.strip() # Remove any leading/trailing whitespace + file.write(f"{ip} port={port}\n") + return output_file + + def _assign_partitions(self, num_partitions: int, partition_dir: str): + ip_file = self.assigned_port(self.metis_config.ip_list) + # Execute each command function in sequence and stop if any fails + if not self._launch_preprocess(num_partitions, self.metis_config.input_path, + ip_file, self.metis_config.dgl_tool_path, + self.metis_config.metadata_filename): + raise RuntimeError("Stopping execution due to failure in preprocess") + if not self._launch_parmetis(num_partitions, self.metis_config.input_path, + ip_file, self.metadata_dict["graph_name"]): + raise RuntimeError("Stopping execution due to failure in parmetis partition process") + if not self._launch_postprocess(num_partitions, self.metis_config.input_path, + self.metis_config.dgl_tool_path, + self.metis_config.metadata_filename, + self.metadata_dict["graph_name"], partition_dir): + raise RuntimeError("Stopping execution due to failure in postprocess process") + + logging.info("Finish all parmetis steps.") + + def _create_metadata(self, num_partitions: int, partition_dir: str): + partition_meta = { + "algo_name": "metis", + "num_parts": num_partitions, + "version": "1.0.0" + } + partition_meta_filepath = os.path.join(partition_dir, "partition_meta.json") + with open(partition_meta_filepath, "w", encoding='utf-8') as metafile: + json.dump(partition_meta, metafile) diff --git a/python/graphstorm/gpartition/partition_config.py b/python/graphstorm/gpartition/partition_config.py new file mode 100644 index 0000000000..3083ee1af9 --- /dev/null +++ b/python/graphstorm/gpartition/partition_config.py @@ -0,0 +1,39 @@ +""" + Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Parmetis partition configuration +""" +from dataclasses import dataclass + +@dataclass +class ParMETISConfig: + """ + Dataclass for holding the configuration for a ParMETIS partitioning algorithm. + + Parameters + ---------- + ip_list: str + ip list + input_path: str + Path to the input graph data + dgl_tool_path: str + Path to the dgl tool added in the PYTHONPATH + metadata_filename: str + schema file name defined in the parmetis step + """ + ip_list: str + input_path: str + dgl_tool_path: str + metadata_filename: str