From a5b22ed37929125250462a38183ef8cbbd50ef17 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Wed, 14 Feb 2024 12:53:15 -0800 Subject: [PATCH 01/13] Add support of using WholeGraph to store/load cache_lm_emb --- python/graphstorm/config/argument.py | 19 ++++++ python/graphstorm/gsf.py | 9 ++- python/graphstorm/model/lm_embed.py | 47 +++++++++++--- python/graphstorm/model/utils.py | 91 ++++++++++++++++++++++++++++ 4 files changed, 153 insertions(+), 13 deletions(-) diff --git a/python/graphstorm/config/argument.py b/python/graphstorm/config/argument.py index 8d9265e7a5..f0d28f7382 100644 --- a/python/graphstorm/config/argument.py +++ b/python/graphstorm/config/argument.py @@ -307,6 +307,7 @@ def verify_arguments(self, is_train): if self.node_lm_configs: _ = self.lm_infer_batch_size _ = self.freeze_lm_encoder_epochs + _ = self.use_wholegraph_cache_lm_embed if self.distill_lm_configs: _ = self.textual_data_path @@ -690,6 +691,18 @@ def cache_lm_embed(self): else: return None + @property + def use_wholegraph_cache_lm_embed(self): + """ Whether to cache the LM embeddings on files by using WholeGraph. + """ + if hasattr(self, "_use_wholegraph_cache_lm_embed"): + if self._use_wholegraph_cache_lm_embed: + assert self.cache_lm_embed, "You must turn on cache_lm_embed " \ + "to use wholegraph cache lm embeddings." + return self._use_wholegraph_cache_lm_embed + else: + return None + ###################### general gnn model related ###################### @property def model_encoder_type(self): @@ -2481,6 +2494,12 @@ def _add_lm_model_args(parser): help="Whether to cache the LM embeddings in files. " + \ "If the LM embeddings have been saved before, load the saved embeddings " + \ "instead of computing the LM embeddings again.") + group.add_argument("--use-wholegraph-cache-lm-embed", + type=lambda x: (str(x).lower() in ['true', '1']), + default=argparse.SUPPRESS, + help="Whether to use WholeGraph to cache the LM embeddings in files. " + \ + "If the LM embeddings have been saved before, load the saved embeddings " + \ + "instead of computing the LM embeddings again.") return parser def _add_rgat_args(parser): diff --git a/python/graphstorm/gsf.py b/python/graphstorm/gsf.py index ad2a49bf41..2ed6d8386c 100644 --- a/python/graphstorm/gsf.py +++ b/python/graphstorm/gsf.py @@ -512,12 +512,14 @@ def set_encoder(model, g, config, train_task): if config.node_lm_configs is not None: emb_path = os.path.join(os.path.dirname(config.part_config), "cached_embs") if config.cache_lm_embed else None + wg_cached_embed = config.use_wholegraph_cache_lm_embed if model_encoder_type == "lm": # only use language model(s) as input layer encoder(s) encoder = GSPureLMNodeInputLayer(g, config.node_lm_configs, - num_train=config.lm_train_nodes, - lm_infer_batch_size=config.lm_infer_batch_size, - cached_embed_path=emb_path) + num_train=config.lm_train_nodes, + lm_infer_batch_size=config.lm_infer_batch_size, + cached_embed_path=emb_path, + wg_cached_embed=wg_cached_embed) else: encoder = GSLMNodeEncoderInputLayer(g, config.node_lm_configs, feat_size, config.hidden_size, @@ -526,6 +528,7 @@ def set_encoder(model, g, config, train_task): dropout=config.dropout, use_node_embeddings=config.use_node_embeddings, cached_embed_path=emb_path, + wg_cached_embed=wg_cached_embed, force_no_embeddings=config.construct_feat_ntype) else: encoder = GSNodeEncoderInputLayer(g, feat_size, config.hidden_size, diff --git a/python/graphstorm/model/lm_embed.py b/python/graphstorm/model/lm_embed.py index 78acfdae67..8609905128 100644 --- a/python/graphstorm/model/lm_embed.py +++ b/python/graphstorm/model/lm_embed.py @@ -31,7 +31,12 @@ from .embed import GSNodeEncoderInputLayer from .lm_model import init_lm_model from .lm_model import get_lm_node_feats -from .utils import load_pytorch_embedding, save_pytorch_embedding +from .utils import ( + load_pytorch_embedding, + save_pytorch_embedding, + load_wholegraph_embedding, + save_wholegraph_embedding +) from ..utils import get_rank, get_world_size, create_dist_tensor from ..distributed import flush_data @@ -239,11 +244,12 @@ class LMCache: embed_path : str The path where the embedding files are stored. """ - def __init__(self, g, lm_models, embed_path=None): + def __init__(self, g, lm_models, embed_path=None, use_wg=False): self._g = g self._lm_models = lm_models self._lm_emb_cache = {} self._embed_path = embed_path + self._use_wg = use_wg self._lm_hash = '' def _get_model_hash(self, ntype): @@ -281,7 +287,11 @@ def _load_embeddings(self): logging.info("load LM embedding from %s for node type %s", embed_path, ntype) embed_name = embed_ndata_names[ntype] - self._lm_emb_cache[ntype] = load_pytorch_embedding(embed_path, + if self.use_wg: + self._lm_emb_cache[ntype] = load_wholegraph_embedding( + embed_path, embed_name) + else: + self._lm_emb_cache[ntype] = load_pytorch_embedding(embed_path, self._g.get_node_partition_policy(ntype), embed_name) if set(self._lm_emb_cache.keys()) == set(self._lm_models.ntypes): logging.debug("Successfully load all embeddings from the cache.") @@ -305,10 +315,16 @@ def _save_embeddings(self): embed_path = os.path.join(os.path.join( os.path.join(self._embed_path, "lm_cache"), ntype), self._get_model_name(ntype)) - save_pytorch_embedding(embed_path, - self._lm_emb_cache[ntype], - get_rank(), - get_world_size()) + if self.use_wg: + save_wholegraph_embedding(embed_path, + self._lm_emb_cache[ntype], + get_rank(), + get_world_size()) + else: + save_pytorch_embedding(embed_path, + self._lm_emb_cache[ntype], + get_rank(), + get_world_size()) def __len__(self): return len(self._lm_emb_cache) @@ -329,6 +345,12 @@ def ntypes(self): """ return self._lm_models.ntypes + @property + def use_wg(self): + """ Whether to use WholeGraph to store the embeddings. + """ + return self._use_wg + @property def embed_ndata_name(self): """ The embed name of the node data @@ -484,7 +506,9 @@ def __init__(self, num_train=0, lm_infer_batch_size=16, use_fp16=True, - cached_embed_path=None): + cached_embed_path=None, + wg_cached_embed=False): + super(GSPureLMNodeInputLayer, self).__init__(g) assert node_lm_configs is not None and len(node_lm_configs) > 0, \ "language model configurations must be provided" @@ -494,7 +518,8 @@ def __init__(self, self.lm_infer_batch_size = lm_infer_batch_size self.use_fp16 = use_fp16 self.use_cache = False - self.lm_emb_cache = LMCache(g, self._lm_models, embed_path=cached_embed_path) + self.lm_emb_cache = LMCache(g, self._lm_models, embed_path=cached_embed_path, + use_wg=wg_cached_embed) self._feat_size = self._lm_models.get_feat_size(self._lm_models.ntypes[0]) for lm_model in self._lm_models.lm_models: @@ -684,6 +709,7 @@ def __init__(self, use_node_embeddings=False, use_fp16=True, cached_embed_path=None, + wg_cached_embed=False, force_no_embeddings=None): assert node_lm_configs is not None and len(node_lm_configs) > 0, \ "language model configurations must be provided" @@ -705,7 +731,8 @@ def __init__(self, self.use_fp16 = use_fp16 self.lm_infer_batch_size = lm_infer_batch_size self.use_cache = False - self.lm_emb_cache = LMCache(g, lm_models, embed_path=cached_embed_path) + self.lm_emb_cache = LMCache(g, lm_models, embed_path=cached_embed_path, + use_wg=wg_cached_embed) super(GSLMNodeEncoderInputLayer, self).__init__( g, adjust_feat_size, embed_size, diff --git a/python/graphstorm/model/utils.py b/python/graphstorm/model/utils.py index f197a94655..644ca8e460 100644 --- a/python/graphstorm/model/utils.py +++ b/python/graphstorm/model/utils.py @@ -565,6 +565,97 @@ def save_pytorch_embedding(emb_path, embedding, rank, world_size): embedding = embedding[start:end] th.save(embedding, os.path.join(emb_path, f'embed-{pad_file_index(rank)}.pt')) +def save_wholegraph_embedding(emb_path, embedding, rank, world_size): + """ Save Dist embedding tensor in binary format for WholeGraph. + + Parameters + ---------- + emb_path : str + The path of the save embedding files. + embedding : WholeGraphDistTensor + The WholeGraph dist tensor to save. + rank : int + Rank of the current process in a distributed environment. + world_size : int + World size in a distributed env. + """ + os.makedirs(emb_path, exist_ok=True) + # [04/16]: Only rank 0 can chmod to let all other ranks to write files. + if rank == 0: + # mode 767 means rwx-rw-rwx: + # - owner of the folder can read, write, and execute; + # - owner' group can read, write; + # - others can read, write, and execute. + os.chmod(emb_path, 0o767) + + # make sure the emb_path permission is changed before other process start to save + barrier() + + assert rank < world_size, \ + f"Process rank {rank} must be smaller than the distributed cluster size {world_size}" + + assert isinstance(embedding, WholeGraphDistTensor), \ + "Input embedding must be a WholeGraphDistTensor." + + emb_num = embedding.num_embeddings + emb_dim = embedding.embedding_dim + emb_dtype = embedding.dtype + emb_name = embedding.name + emb_info = { + "format": "binary", + "emb_num": str(emb_num), + "emb_dim": str(emb_dim), + "emb_dtype": str(emb_dtype), + "emb_name": emb_name, + "world_size": world_size + } + + embedding.save_to_file(emb_path, file_prefix="wg-embed") + if rank == 0: + with open(os.path.join(emb_path, "emb_info.json"), 'w', encoding='utf-8') as f: + json.dump(emb_info, f, indent=4) + +def load_wholegraph_embedding(emb_path, name): + """ Load embedding tensor in binary format for WholeGraph. + + Parameters + ---------- + emb_path : str + The path of the save embedding files. + part_policy : dgl.distributed.PartitionPolicy + The partitioning policy + name : str + The name of the created distributed tensor. + + Returns + ------- + WholeGraphDistTensor : the loaded embeddings in WholeGraph. + """ + with open(os.path.join(emb_path, "emb_info.json"), 'r', encoding='utf-8') as f: + emb_info = json.load(f) + + emb_num = int(emb_info['emb_num']) + emb_dim = int(emb_info['emb_dim']) + world_size_in_save = int(emb_info['world_size']) + supported_dtypes = { + 'torch.float32': th.float32, + 'torch.float': th.float, + 'torch.int64': th.int64, + 'torch.int32': th.int32, + } + emb_dtype = supported_dtypes[emb_info['emb_dtype'])] + dist_emb = WholeGraphDistTensor((emb_num, emb_dim), emb_dtype, name=name) + files = os.listdir(emb_path) + filtered_files = [file for file in files if file.startswith("wg-embed")] + num_files = len(filtered_files) + assert num_files > 0, "No WholeGraph embedding files found." + assert world_size_in_save == num_files, \ + f"World_size when save the embedding {world_size_in_save} \ + doesn't match the number of files {num_files}." + dist_emb.load_from_file(emb_path, "wg-embed", num_files) + barrier() + return dist_emb + def load_pytorch_embedding(emb_path, part_policy, name): """ Load embedding tensor in Pytorch format. From 228fb6459738a96aafe2fc5c7e0544c09f781474 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Wed, 14 Feb 2024 13:09:00 -0800 Subject: [PATCH 02/13] Enable update cache --- python/graphstorm/model/lm_embed.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/python/graphstorm/model/lm_embed.py b/python/graphstorm/model/lm_embed.py index 8609905128..254cf15fd8 100644 --- a/python/graphstorm/model/lm_embed.py +++ b/python/graphstorm/model/lm_embed.py @@ -38,6 +38,7 @@ save_wholegraph_embedding ) from ..utils import get_rank, get_world_size, create_dist_tensor +from ..wholegraph import WholeGraphDistTensor from ..distributed import flush_data class LMModels(nn.Module): @@ -397,12 +398,18 @@ def update_cache(self, lm_infer_batch_size, use_fp16=True): hidden_size = lm_model.feat_size if ntype not in self._lm_emb_cache: embed_name = embed_ndata_names[ntype] - self._lm_emb_cache[ntype] = create_dist_tensor( + if self.use_wg: + self._lm_emb_cache[ntype] = WholeGraphDistTensor( (self._g.number_of_nodes(ntype), hidden_size), - name=embed_name, dtype=th.float16 if use_fp16 else th.float32, - part_policy=self._g.get_node_partition_policy(ntype), - persistent=True) + name=embed_name) + else: + self._lm_emb_cache[ntype] = create_dist_tensor( + (self._g.number_of_nodes(ntype), hidden_size), + name=embed_name, + dtype=th.float16 if use_fp16 else th.float32, + part_policy=self._g.get_node_partition_policy(ntype), + persistent=True) emb = self._lm_emb_cache[ntype] # LM computations are very computationally expensive. It's better to force # an even split to ensure all processes have roughly the same number of nodes @@ -423,10 +430,11 @@ def update_cache(self, lm_infer_batch_size, use_fp16=True): fname: feat[input_nodes] for fname, feat in lm_node_feat.items() } text_embs = lm_model(input_ntypes, input_lm_feats) + device = 'cuda' if self.use_wg else 'cpu' if use_fp16: - emb[input_nodes] = text_embs[ntype].half().to('cpu') + emb[input_nodes] = text_embs[ntype].half().to(device) else: - emb[input_nodes] = text_embs[ntype].to('cpu') + emb[input_nodes] = text_embs[ntype].to(device) if i % 1000 == 0 and get_rank() == 0: logging.debug("Compute LM embeddings on %d batches out of %d", i, len(node_list)) From 66c1f4701d70d3610e70267afb055111c6cac0cf Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Wed, 14 Feb 2024 14:50:11 -0800 Subject: [PATCH 03/13] Add unit tests --- python/graphstorm/model/utils.py | 6 +- tests/unit-tests/ip_config.txt | 1 + tests/unit-tests/temp | 3 + tests/unit-tests/test_embed.py | 131 +++++++++++++++++++++++++ tests/unit-tests/test_wg_sparse_emb.py | 2 - 5 files changed, 139 insertions(+), 4 deletions(-) create mode 100644 tests/unit-tests/ip_config.txt create mode 100644 tests/unit-tests/temp diff --git a/python/graphstorm/model/utils.py b/python/graphstorm/model/utils.py index 644ca8e460..2cb3b709b8 100644 --- a/python/graphstorm/model/utils.py +++ b/python/graphstorm/model/utils.py @@ -638,12 +638,14 @@ def load_wholegraph_embedding(emb_path, name): emb_dim = int(emb_info['emb_dim']) world_size_in_save = int(emb_info['world_size']) supported_dtypes = { + 'torch.half': th.half, + 'torch.float16': th.float16, 'torch.float32': th.float32, 'torch.float': th.float, 'torch.int64': th.int64, - 'torch.int32': th.int32, + 'torch.int32': th.int32 } - emb_dtype = supported_dtypes[emb_info['emb_dtype'])] + emb_dtype = supported_dtypes[emb_info['emb_dtype']] dist_emb = WholeGraphDistTensor((emb_num, emb_dim), emb_dtype, name=name) files = os.listdir(emb_path) filtered_files = [file for file in files if file.startswith("wg-embed")] diff --git a/tests/unit-tests/ip_config.txt b/tests/unit-tests/ip_config.txt new file mode 100644 index 0000000000..16fb14e919 --- /dev/null +++ b/tests/unit-tests/ip_config.txt @@ -0,0 +1 @@ +127.0.0.1 16340 diff --git a/tests/unit-tests/temp b/tests/unit-tests/temp new file mode 100644 index 0000000000..8564fffc17 --- /dev/null +++ b/tests/unit-tests/temp @@ -0,0 +1,3 @@ +{ + "dtype": [] +} \ No newline at end of file diff --git a/tests/unit-tests/test_embed.py b/tests/unit-tests/test_embed.py index 905d9c5792..f0619edae6 100644 --- a/tests/unit-tests/test_embed.py +++ b/tests/unit-tests/test_embed.py @@ -37,6 +37,7 @@ from graphstorm.model.utils import (LazyDistTensor, load_pytorch_embedding, save_pytorch_embedding) +from graphstorm.wholegraph import init_wholegraph from data_utils import generate_dummy_dist_graph from data_utils import create_lm_graph, create_lm_graph2, load_lm_graph @@ -252,6 +253,9 @@ def test_lm_cache(): th.distributed.destroy_process_group() dgl.distributed.kvstore.close_kvstore() + + + def run_dist_cache(part_config, tmpdirname): gs.initialize(ip_config=None, backend="gloo") g, lm_config = load_lm_graph(part_config) @@ -543,6 +547,130 @@ def load_tensor(mode_get_world_size, load_tensor() assert_almost_equal(emb.numpy(), out_emb.numpy()) + +def _wg_finalize(): + import pylibwholegraph.torch as wgth + wgth.finalize() + # below patch fix (manually reset wg comm) will not be needed + # once PR: https://github.com/rapidsai/wholegraph/pull/111 is merged. + import pylibwholegraph.torch.comm as wgth_comm + wgth_comm.global_communicators = {} + wgth_comm.local_node_communicator = None + wgth_comm.local_device_communicator = None + + +def _wg_initialize(proc_id=0, nprocs=1): + from dgl.distributed import role + # Need to fix the wholegraph initializer + # Options.local_rank = get_rank() % role.get_num_trainers() reqiures role to be initialized + role.init_role("default") + backend = "nccl" + assert th.cuda.is_available(), "NCCL backend requires CUDA device(s) to be available." + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "29501" + os.environ["RANK"] = str(proc_id) + os.environ["WORLD_SIZE"] = str(nprocs) + os.environ["LOCAL_RANK"] = str(proc_id) + os.environ["LOCAL_WORLD_SIZE"] = str(nprocs) + + th.cuda.set_device(proc_id) # necessary for this example + th.distributed.init_process_group(backend=backend, world_size=nprocs, rank=proc_id) + init_wholegraph() + + +def test_wg_lm_cache(): + # initialize the torch distributed environment + pytest.importorskip("pylibwholegraph.torch") + if not th.cuda.is_available(): + pytest.skip("Skip test_wholegraph_lm_cache due to no GPU devices.") + + _wg_initialize() + with tempfile.TemporaryDirectory() as tmpdirname: + lm_config, feat_size, input_ids, attention_mask, g, _ = \ + create_lm_graph(tmpdirname) + + lm_models = LMModels(g, lm_config, 0, 10) + lm_cache = LMCache(g, lm_models, tmpdirname, use_wg=True) + ret = lm_cache.update_cache(100) + assert ret == True # This is the first time we need to compute the BERT embeddings. + assert len(lm_cache) == 1 + assert len(lm_cache.ntypes) == 1 + assert lm_cache.ntypes[0] == 'n0' + + lm_models._lm_models["n0"].lm_model.init_weights() + # Create the second cache. It should loads the embeddings from + # the first cache. + lm_cache2 = LMCache(g, lm_models, tmpdirname, use_wg=True) + ret = lm_cache2.update_cache(100) + assert ret == False # It loads LM embeddings directly. + assert len(lm_cache2) == 1 + emb1 = lm_cache["n0"] + emb2 = lm_cache2["n0"] + emb1, _ = emb1.get_local_tensor() + emb2, _ = emb2.get_local_tensor() + assert np.all(emb1.numpy() == emb2.numpy()) + ret = lm_cache2.update_cache(100) + assert ret == False # It uses the loaded embeddings. + lm_cache2.clear_cache() + ret = lm_cache2.update_cache(100) + assert ret == False # Load LM embeddings from the disk. + + # If the model is changed, the model name should also be changed. + model_name = lm_cache._get_model_name("n0") + for param in lm_models.get_lm_model("n0").parameters(): + param.data += 1 + model_name1 = lm_cache._get_model_name("n0") + assert model_name != model_name1 + _wg_finalize() + th.distributed.destroy_process_group() if th.distributed.is_initialized() else None + + +def run_wg_dist_cache(proc_id, nprocs, part_config, tmpdirname): + gs.initialize(ip_config=None, backend="nccl") + _wg_initialize(proc_id, nprocs) + g, lm_config = load_lm_graph(part_config) + lm_models = LMModels(g, lm_config, 0, 10) + lm_cache = LMCache(g, lm_models, tmpdirname, use_wg=True) + lm_cache.update_cache(100) + assert len(lm_cache) == 1 + assert len(lm_cache.ntypes) == 1 + assert lm_cache.ntypes[0] == 'n0' + + # Create the second cache. It should loads the embeddings from + # the first cache. + lm_cache2 = LMCache(g, lm_models, tmpdirname, use_wg=True) + lm_cache2.update_cache(100) + assert len(lm_cache2) == 1 + emb1 = lm_cache["n0"] + emb2 = lm_cache2["n0"] + emb1, _ = emb1.get_local_tensor() + emb2, _ = emb2.get_local_tensor() + assert np.all(emb1.numpy() == emb2.numpy()) + _wg_finalize() + th.distributed.destroy_process_group() if th.distributed.is_initialized() else None + + +@pytest.mark.parametrize("world_size", [1, 3, 4]) +def test_mp_wg_lm_cache(world_size): + pytest.importorskip("pylibwholegraph.torch") + if world_size > th.cuda.device_count(): + pytest.skip("Skip test_wg_sparse_embed_save_load due to insufficient GPU devices.") + os.environ["OMP_NUM_THREADS"] = str(mp.cpu_count() // 2 // world_size) + ctx = mp.get_context("spawn") + ptrainer_list = [] + with tempfile.TemporaryDirectory() as tmpdirname: + lm_config, feat_size, input_ids, attention_mask, _, part_config = \ + create_lm_graph(tmpdirname) + for rank in range(world_size): + p = ctx.Process(target=run_wg_dist_cache, args=(rank, world_size, part_config, tmpdirname)) + p.start() + ptrainer_list.append(p) + + for p in ptrainer_list: + p.join() + assert p.exitcode == 0 + + if __name__ == '__main__': test_pytroch_emb_load_save(11) test_lm_cache() @@ -564,3 +692,6 @@ def load_tensor(mode_get_world_size, test_lm_embed_warmup('cpu') test_lm_embed_warmup('cuda:0') test_lm_infer() + + test_wg_lm_cache() + test_mp_wg_lm_cache(1) diff --git a/tests/unit-tests/test_wg_sparse_emb.py b/tests/unit-tests/test_wg_sparse_emb.py index 50bb264de0..f5fc739dd2 100644 --- a/tests/unit-tests/test_wg_sparse_emb.py +++ b/tests/unit-tests/test_wg_sparse_emb.py @@ -90,8 +90,6 @@ def reset_envs(): def _initialize(proc_id, nprocs, use_wholegraph=True): backend = "nccl" - #from dgl.distributed import role - #role.init_role("default") assert th.cuda.is_available(), "NCCL backend requires CUDA device(s) to be available." os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "29501" From 1be3fd896327b9385302a50e2b1868573655f7fc Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Wed, 14 Feb 2024 14:52:39 -0800 Subject: [PATCH 04/13] Cleaning --- tests/unit-tests/ip_config.txt | 1 - tests/unit-tests/temp | 3 --- tests/unit-tests/test_embed.py | 3 --- 3 files changed, 7 deletions(-) delete mode 100644 tests/unit-tests/ip_config.txt delete mode 100644 tests/unit-tests/temp diff --git a/tests/unit-tests/ip_config.txt b/tests/unit-tests/ip_config.txt deleted file mode 100644 index 16fb14e919..0000000000 --- a/tests/unit-tests/ip_config.txt +++ /dev/null @@ -1 +0,0 @@ -127.0.0.1 16340 diff --git a/tests/unit-tests/temp b/tests/unit-tests/temp deleted file mode 100644 index 8564fffc17..0000000000 --- a/tests/unit-tests/temp +++ /dev/null @@ -1,3 +0,0 @@ -{ - "dtype": [] -} \ No newline at end of file diff --git a/tests/unit-tests/test_embed.py b/tests/unit-tests/test_embed.py index f0619edae6..93799fb75b 100644 --- a/tests/unit-tests/test_embed.py +++ b/tests/unit-tests/test_embed.py @@ -253,9 +253,6 @@ def test_lm_cache(): th.distributed.destroy_process_group() dgl.distributed.kvstore.close_kvstore() - - - def run_dist_cache(part_config, tmpdirname): gs.initialize(ip_config=None, backend="gloo") g, lm_config = load_lm_graph(part_config) From 59cfdffc3a9831979e887a24f64d933139e025fe Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Fri, 16 Feb 2024 14:15:06 -0800 Subject: [PATCH 05/13] Address comment (partly) --- python/graphstorm/model/lm_embed.py | 3 +- python/graphstorm/model/utils.py | 60 ++++++++++++++++++---- python/graphstorm/wholegraph/wholegraph.py | 11 ++++ 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/python/graphstorm/model/lm_embed.py b/python/graphstorm/model/lm_embed.py index 254cf15fd8..6b4dfbf2b4 100644 --- a/python/graphstorm/model/lm_embed.py +++ b/python/graphstorm/model/lm_embed.py @@ -320,7 +320,8 @@ def _save_embeddings(self): save_wholegraph_embedding(embed_path, self._lm_emb_cache[ntype], get_rank(), - get_world_size()) + get_world_size(), + fmt="pytorch") else: save_pytorch_embedding(embed_path, self._lm_emb_cache[ntype], diff --git a/python/graphstorm/model/utils.py b/python/graphstorm/model/utils.py index 2cb3b709b8..7f9a19b4a1 100644 --- a/python/graphstorm/model/utils.py +++ b/python/graphstorm/model/utils.py @@ -565,7 +565,7 @@ def save_pytorch_embedding(emb_path, embedding, rank, world_size): embedding = embedding[start:end] th.save(embedding, os.path.join(emb_path, f'embed-{pad_file_index(rank)}.pt')) -def save_wholegraph_embedding(emb_path, embedding, rank, world_size): +def save_wholegraph_embedding(emb_path, embedding, rank, world_size, fmt="binary"): """ Save Dist embedding tensor in binary format for WholeGraph. Parameters @@ -578,7 +578,11 @@ def save_wholegraph_embedding(emb_path, embedding, rank, world_size): Rank of the current process in a distributed environment. world_size : int World size in a distributed env. + fmt : str + The format of the saved embeddings. Currently only support "binary" and "pytorch". """ + assert fmt in ["binary", "pytorch"], \ + "Using WholeGraph, the supported formats of the saved embeddings are 'binary' and 'pytorch'." os.makedirs(emb_path, exist_ok=True) # [04/16]: Only rank 0 can chmod to let all other ranks to write files. if rank == 0: @@ -601,16 +605,30 @@ def save_wholegraph_embedding(emb_path, embedding, rank, world_size): emb_dim = embedding.embedding_dim emb_dtype = embedding.dtype emb_name = embedding.name + emb_fmt = "wholegraph-" + fmt emb_info = { - "format": "binary", + "format": emb_fmt, "emb_num": str(emb_num), "emb_dim": str(emb_dim), "emb_dtype": str(emb_dtype), "emb_name": emb_name, "world_size": world_size } + if fmt == "binary": + # use binary format to save the embedding (supported by native WholeGraph APIs) + # Example: wg-embed_part_0_of_2, wg-embed_part_1_of_2 + # Pros: WholeGraph's natvie API to load the embedding directly. + # no RAM duplication; support save/load with different world_size. + embedding.save_to_file(emb_path, file_prefix="wg-embed") + elif fmt == "pytorch": + # use pytorch format to save the embedding (dump local tensor to pt file) + # Example: embed-00000.pt, embed-00001.pt + # Pros: Compatible with the format when WholeGraph is not enabled, + # but still follows wholegraph's even partition policy and duplicate RAM when load. + emb = embedding.get_local_tensor()[0] + wg_rank = embedding.get_comm().get_rank() + th.save(emb, os.path.join(emb_path, f'embed-{pad_file_index(wg_rank)}.pt')) - embedding.save_to_file(emb_path, file_prefix="wg-embed") if rank == 0: with open(os.path.join(emb_path, "emb_info.json"), 'w', encoding='utf-8') as f: json.dump(emb_info, f, indent=4) @@ -634,6 +652,10 @@ def load_wholegraph_embedding(emb_path, name): with open(os.path.join(emb_path, "emb_info.json"), 'r', encoding='utf-8') as f: emb_info = json.load(f) + emb_fmt = emb_info['format'] + assert emb_fmt.startswith("wholegraph-"), \ + "The format of the saved embeddings should be started with 'wholegraph-'." + emb_fmt = emb_fmt.split("-")[1] emb_num = int(emb_info['emb_num']) emb_dim = int(emb_info['emb_dim']) world_size_in_save = int(emb_info['world_size']) @@ -647,14 +669,30 @@ def load_wholegraph_embedding(emb_path, name): } emb_dtype = supported_dtypes[emb_info['emb_dtype']] dist_emb = WholeGraphDistTensor((emb_num, emb_dim), emb_dtype, name=name) - files = os.listdir(emb_path) - filtered_files = [file for file in files if file.startswith("wg-embed")] - num_files = len(filtered_files) - assert num_files > 0, "No WholeGraph embedding files found." - assert world_size_in_save == num_files, \ - f"World_size when save the embedding {world_size_in_save} \ - doesn't match the number of files {num_files}." - dist_emb.load_from_file(emb_path, "wg-embed", num_files) + if emb_fmt == "pytorch": + assert dist_emb.get_comm().get_size() == world_size_in_save, \ + "World_size when save the embedding is different than the current world_size. " \ + "Please switch to the binary format." + wg_rank = dist_emb.get_comm().get_rank() + file_path = os.path.join(emb_path, f'embed-{pad_file_index(wg_rank)}.pt') + assert os.path.exists(file_path), f"Embedding file {file_path} of \ + my rank {wg_rank} doesn't exist." + emb = th.load(file_path) + local_emb = dist_emb.get_local_tensor()[0] + assert emb.shape[0] == local_emb.shape[0] and emb.shape[1] == local_emb.shape[1], \ + "Embedding shape does not match!" + assert emb.dtype == local_emb.dtype, "Embedding datatype do not match!" + local_emb.copy_(emb) + elif emb_fmt == "binary": + files = os.listdir(emb_path) + filtered_files = [file for file in files if file.startswith("wg-embed")] + num_files = len(filtered_files) + assert num_files > 0, "No WholeGraph embedding files found." + assert world_size_in_save == num_files, \ + f"World_size when save the embedding {world_size_in_save} \ + doesn't match the number of files {num_files}." + dist_emb.load_from_file(emb_path, "wg-embed", num_files) + barrier() return dist_emb diff --git a/python/graphstorm/wholegraph/wholegraph.py b/python/graphstorm/wholegraph/wholegraph.py index 57dd6b0dce..4056de7b2e 100644 --- a/python/graphstorm/wholegraph/wholegraph.py +++ b/python/graphstorm/wholegraph/wholegraph.py @@ -560,6 +560,17 @@ def get_local_tensor(self): ) return local_tensor, offset + def get_comm(self): + """ + Get the communicator of the WholeGraph embedding. + + Returns + ------- + WholeMemoryCommunicator + The WholeGraph global communicator of the WholeGraph embedding. + """ + return self._tensor.get_embedding_tensor().get_comm() + def _reset_storage(self): """Reset the storage of the WholeGraph embedding.""" self._tensor = None From 9a4b95fce6593c28764a2aabfefe2cb98cd5c2c3 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Fri, 16 Feb 2024 17:31:00 -0800 Subject: [PATCH 06/13] Address comment (add an unified flag to turn on WholeGraph) --- python/graphstorm/config/argument.py | 39 ++++++++++++++-------------- python/graphstorm/gsf.py | 5 ++-- python/graphstorm/model/lm_embed.py | 2 +- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/python/graphstorm/config/argument.py b/python/graphstorm/config/argument.py index f0d28f7382..5cb6ee6c88 100644 --- a/python/graphstorm/config/argument.py +++ b/python/graphstorm/config/argument.py @@ -261,6 +261,7 @@ def verify_arguments(self, is_train): _ = self.node_id_mapping_file _ = self.edge_id_mapping_file _ = self.verbose + _ = self.use_wholegraph # Data _ = self.node_feat_name @@ -307,7 +308,6 @@ def verify_arguments(self, is_train): if self.node_lm_configs: _ = self.lm_infer_batch_size _ = self.freeze_lm_encoder_epochs - _ = self.use_wholegraph_cache_lm_embed if self.distill_lm_configs: _ = self.textual_data_path @@ -538,6 +538,18 @@ def verbose(self): return False + @property + def use_wholegrap(self): + """ Whether to use WholeGraph to store intermediate embeddings/tensors generated + during training or inference, e.g., cache_lm_emb, sparse_emb, etc. + """ + if hasattr(self, "_use_wholegraph"): + assert self._use_wholegraph in [True, False], \ + "Invalid value for _use_wholegraph. Must be either True or False." + return self._use_wholegraph + else: + return None + ###################### language model support ######################### # Bert related @property @@ -691,18 +703,6 @@ def cache_lm_embed(self): else: return None - @property - def use_wholegraph_cache_lm_embed(self): - """ Whether to cache the LM embeddings on files by using WholeGraph. - """ - if hasattr(self, "_use_wholegraph_cache_lm_embed"): - if self._use_wholegraph_cache_lm_embed: - assert self.cache_lm_embed, "You must turn on cache_lm_embed " \ - "to use wholegraph cache lm embeddings." - return self._use_wholegraph_cache_lm_embed - else: - return None - ###################### general gnn model related ###################### @property def model_encoder_type(self): @@ -2298,6 +2298,13 @@ def _add_initialization_args(parser): default=argparse.SUPPRESS, help="Print more information.", ) + group.add_argument( + "--use-wholegraph", + type=lambda x: (str(x).lower() in ['true', '1']), + default=argparse.SUPPRESS, + help="Whether to use WholeGraph to store intermediate embeddings/tensors generated \ + during training or inference, e.g., cache_lm_emb, sparse_emb, etc." + ) return parser def _add_gsgnn_basic_args(parser): @@ -2494,12 +2501,6 @@ def _add_lm_model_args(parser): help="Whether to cache the LM embeddings in files. " + \ "If the LM embeddings have been saved before, load the saved embeddings " + \ "instead of computing the LM embeddings again.") - group.add_argument("--use-wholegraph-cache-lm-embed", - type=lambda x: (str(x).lower() in ['true', '1']), - default=argparse.SUPPRESS, - help="Whether to use WholeGraph to cache the LM embeddings in files. " + \ - "If the LM embeddings have been saved before, load the saved embeddings " + \ - "instead of computing the LM embeddings again.") return parser def _add_rgat_args(parser): diff --git a/python/graphstorm/gsf.py b/python/graphstorm/gsf.py index 2ed6d8386c..1789ca7ae7 100644 --- a/python/graphstorm/gsf.py +++ b/python/graphstorm/gsf.py @@ -512,14 +512,13 @@ def set_encoder(model, g, config, train_task): if config.node_lm_configs is not None: emb_path = os.path.join(os.path.dirname(config.part_config), "cached_embs") if config.cache_lm_embed else None - wg_cached_embed = config.use_wholegraph_cache_lm_embed if model_encoder_type == "lm": # only use language model(s) as input layer encoder(s) encoder = GSPureLMNodeInputLayer(g, config.node_lm_configs, num_train=config.lm_train_nodes, lm_infer_batch_size=config.lm_infer_batch_size, cached_embed_path=emb_path, - wg_cached_embed=wg_cached_embed) + wg_cached_embed=config.use_wholegraph) else: encoder = GSLMNodeEncoderInputLayer(g, config.node_lm_configs, feat_size, config.hidden_size, @@ -528,7 +527,7 @@ def set_encoder(model, g, config, train_task): dropout=config.dropout, use_node_embeddings=config.use_node_embeddings, cached_embed_path=emb_path, - wg_cached_embed=wg_cached_embed, + wg_cached_embed=config.use_wholegraph, force_no_embeddings=config.construct_feat_ntype) else: encoder = GSNodeEncoderInputLayer(g, feat_size, config.hidden_size, diff --git a/python/graphstorm/model/lm_embed.py b/python/graphstorm/model/lm_embed.py index 6b4dfbf2b4..2eb7178919 100644 --- a/python/graphstorm/model/lm_embed.py +++ b/python/graphstorm/model/lm_embed.py @@ -321,7 +321,7 @@ def _save_embeddings(self): self._lm_emb_cache[ntype], get_rank(), get_world_size(), - fmt="pytorch") + fmt="binary") else: save_pytorch_embedding(embed_path, self._lm_emb_cache[ntype], From a7d63364caa387f097f4261008ca6f1437c204b3 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 19 Feb 2024 09:56:52 -0800 Subject: [PATCH 07/13] Change arg name and update err msg --- python/graphstorm/config/argument.py | 14 +++++++------- python/graphstorm/gsf.py | 4 ++-- python/graphstorm/model/utils.py | 4 +++- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/python/graphstorm/config/argument.py b/python/graphstorm/config/argument.py index 5cb6ee6c88..1afc83f929 100644 --- a/python/graphstorm/config/argument.py +++ b/python/graphstorm/config/argument.py @@ -261,7 +261,7 @@ def verify_arguments(self, is_train): _ = self.node_id_mapping_file _ = self.edge_id_mapping_file _ = self.verbose - _ = self.use_wholegraph + _ = self.use_wholegraph_embed # Data _ = self.node_feat_name @@ -539,14 +539,14 @@ def verbose(self): return False @property - def use_wholegrap(self): + def use_wholegraph_embed(self): """ Whether to use WholeGraph to store intermediate embeddings/tensors generated during training or inference, e.g., cache_lm_emb, sparse_emb, etc. """ - if hasattr(self, "_use_wholegraph"): - assert self._use_wholegraph in [True, False], \ - "Invalid value for _use_wholegraph. Must be either True or False." - return self._use_wholegraph + if hasattr(self, "_use_wholegraph_embed"): + assert self._use_wholegraph_embed in [True, False], \ + "Invalid value for _use_wholegraph_embed. Must be either True or False." + return self._use_wholegraph_embed else: return None @@ -2299,7 +2299,7 @@ def _add_initialization_args(parser): help="Print more information.", ) group.add_argument( - "--use-wholegraph", + "--use-wholegraph-embed", type=lambda x: (str(x).lower() in ['true', '1']), default=argparse.SUPPRESS, help="Whether to use WholeGraph to store intermediate embeddings/tensors generated \ diff --git a/python/graphstorm/gsf.py b/python/graphstorm/gsf.py index 1789ca7ae7..af88e76bbe 100644 --- a/python/graphstorm/gsf.py +++ b/python/graphstorm/gsf.py @@ -518,7 +518,7 @@ def set_encoder(model, g, config, train_task): num_train=config.lm_train_nodes, lm_infer_batch_size=config.lm_infer_batch_size, cached_embed_path=emb_path, - wg_cached_embed=config.use_wholegraph) + wg_cached_embed=config.use_wholegraph_embed) else: encoder = GSLMNodeEncoderInputLayer(g, config.node_lm_configs, feat_size, config.hidden_size, @@ -527,7 +527,7 @@ def set_encoder(model, g, config, train_task): dropout=config.dropout, use_node_embeddings=config.use_node_embeddings, cached_embed_path=emb_path, - wg_cached_embed=config.use_wholegraph, + wg_cached_embed=config.use_wholegraph_embed, force_no_embeddings=config.construct_feat_ntype) else: encoder = GSNodeEncoderInputLayer(g, feat_size, config.hidden_size, diff --git a/python/graphstorm/model/utils.py b/python/graphstorm/model/utils.py index 33b36d25aa..615c960013 100644 --- a/python/graphstorm/model/utils.py +++ b/python/graphstorm/model/utils.py @@ -680,7 +680,9 @@ def load_wholegraph_embedding(emb_path, name): emb = th.load(file_path) local_emb = dist_emb.get_local_tensor()[0] assert emb.shape[0] == local_emb.shape[0] and emb.shape[1] == local_emb.shape[1], \ - "Embedding shape does not match!" + f"Embedding shape of {name} does not match! " + \ + f"Expect {emb.shape}, but get {local_emb.shape}" + assert emb.dtype == local_emb.dtype, "Embedding datatype do not match!" local_emb.copy_(emb) elif emb_fmt == "binary": From 470cddd39b0621d0a41f62b5cdd7e215e6b4f64a Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 19 Feb 2024 14:58:09 -0800 Subject: [PATCH 08/13] Add e2e tests and update train scripts --- .../run/gsgnn_emb/gsgnn_node_emb.py | 3 ++- .../graphstorm/run/gsgnn_ep/ep_infer_gnn.py | 3 ++- python/graphstorm/run/gsgnn_ep/gsgnn_ep.py | 3 ++- python/graphstorm/run/gsgnn_lp/gsgnn_lp.py | 3 ++- .../graphstorm/run/gsgnn_lp/lp_infer_gnn.py | 3 ++- python/graphstorm/run/gsgnn_np/gsgnn_np.py | 3 ++- .../graphstorm/run/gsgnn_np/np_infer_gnn.py | 3 ++- .../end2end-tests/graphstorm-nc/mgpu_test.sh | 26 +++++++++++++++++++ 8 files changed, 40 insertions(+), 7 deletions(-) diff --git a/python/graphstorm/run/gsgnn_emb/gsgnn_node_emb.py b/python/graphstorm/run/gsgnn_emb/gsgnn_node_emb.py index a2f3626d46..3b35392ab2 100644 --- a/python/graphstorm/run/gsgnn_emb/gsgnn_node_emb.py +++ b/python/graphstorm/run/gsgnn_emb/gsgnn_node_emb.py @@ -35,8 +35,9 @@ def main(config_args): config.verify_arguments(True) use_wg_feats = use_wholegraph(config.part_config) + use_wg_embed = config.use_wholegraph_sparse_emb or config.use_wholegraph_embed gs.initialize(ip_config=config.ip_config, backend=config.backend, - use_wholegraph=config.use_wholegraph_sparse_emb or use_wg_feats) + use_wholegraph=use_wg_embed or use_wg_feats) rt_profiler.init(config.profile_path, rank=gs.get_rank()) sys_tracker.init(config.verbose, rank=gs.get_rank()) device = setup_device(config.local_rank) diff --git a/python/graphstorm/run/gsgnn_ep/ep_infer_gnn.py b/python/graphstorm/run/gsgnn_ep/ep_infer_gnn.py index 25f663781d..301fdecd99 100644 --- a/python/graphstorm/run/gsgnn_ep/ep_infer_gnn.py +++ b/python/graphstorm/run/gsgnn_ep/ep_infer_gnn.py @@ -44,8 +44,9 @@ def main(config_args): config.verify_arguments(False) use_wg_feats = use_wholegraph(config.part_config) + use_wg_embed = config.use_wholegraph_sparse_emb or config.use_wholegraph_embed gs.initialize(ip_config=config.ip_config, backend=config.backend, - use_wholegraph=config.use_wholegraph_sparse_emb or use_wg_feats) + use_wholegraph=use_wg_embed or use_wg_feats) device = setup_device(config.local_rank) infer_data = GSgnnEdgeInferData(config.graph_name, diff --git a/python/graphstorm/run/gsgnn_ep/gsgnn_ep.py b/python/graphstorm/run/gsgnn_ep/gsgnn_ep.py index 85e5680a41..404a8cc934 100644 --- a/python/graphstorm/run/gsgnn_ep/gsgnn_ep.py +++ b/python/graphstorm/run/gsgnn_ep/gsgnn_ep.py @@ -58,8 +58,9 @@ def main(config_args): config.verify_arguments(True) use_wg_feats = use_wholegraph(config.part_config) + use_wg_embed = config.use_wholegraph_sparse_emb or config.use_wholegraph_embed gs.initialize(ip_config=config.ip_config, backend=config.backend, - use_wholegraph=config.use_wholegraph_sparse_emb or use_wg_feats) + use_wholegraph=use_wg_embed or use_wg_feats) rt_profiler.init(config.profile_path, rank=gs.get_rank()) sys_tracker.init(config.verbose, rank=gs.get_rank()) device = setup_device(config.local_rank) diff --git a/python/graphstorm/run/gsgnn_lp/gsgnn_lp.py b/python/graphstorm/run/gsgnn_lp/gsgnn_lp.py index 34fc25ba3d..ab49f6d6af 100644 --- a/python/graphstorm/run/gsgnn_lp/gsgnn_lp.py +++ b/python/graphstorm/run/gsgnn_lp/gsgnn_lp.py @@ -98,8 +98,9 @@ def main(config_args): config.verify_arguments(True) use_wg_feats = use_wholegraph(config.part_config) + use_wg_embed = config.use_wholegraph_sparse_emb or config.use_wholegraph_embed gs.initialize(ip_config=config.ip_config, backend=config.backend, - use_wholegraph=config.use_wholegraph_sparse_emb or use_wg_feats) + use_wholegraph=use_wg_embed or use_wg_feats) rt_profiler.init(config.profile_path, rank=gs.get_rank()) sys_tracker.init(config.verbose, rank=gs.get_rank()) device = setup_device(config.local_rank) diff --git a/python/graphstorm/run/gsgnn_lp/lp_infer_gnn.py b/python/graphstorm/run/gsgnn_lp/lp_infer_gnn.py index 073c529dbf..4bb7834186 100644 --- a/python/graphstorm/run/gsgnn_lp/lp_infer_gnn.py +++ b/python/graphstorm/run/gsgnn_lp/lp_infer_gnn.py @@ -40,8 +40,9 @@ def main(config_args): config.verify_arguments(False) use_wg_feats = use_wholegraph(config.part_config) + use_wg_embed = config.use_wholegraph_sparse_emb or config.use_wholegraph_embed gs.initialize(ip_config=config.ip_config, backend=config.backend, - use_wholegraph=config.use_wholegraph_sparse_emb or use_wg_feats) + use_wholegraph=use_wg_embed or use_wg_feats) device = setup_device(config.local_rank) infer_data = GSgnnEdgeInferData(config.graph_name, diff --git a/python/graphstorm/run/gsgnn_np/gsgnn_np.py b/python/graphstorm/run/gsgnn_np/gsgnn_np.py index 1fa1d2f615..4d5fd01a5f 100644 --- a/python/graphstorm/run/gsgnn_np/gsgnn_np.py +++ b/python/graphstorm/run/gsgnn_np/gsgnn_np.py @@ -62,8 +62,9 @@ def main(config_args): config.verify_arguments(True) use_wg_feats = use_wholegraph(config.part_config) + use_wg_embed = config.use_wholegraph_sparse_emb or config.use_wholegraph_embed gs.initialize(ip_config=config.ip_config, backend=config.backend, - use_wholegraph=config.use_wholegraph_sparse_emb or use_wg_feats) + use_wholegraph=use_wg_embed or use_wg_feats) rt_profiler.init(config.profile_path, rank=gs.get_rank()) sys_tracker.init(config.verbose, rank=gs.get_rank()) device = setup_device(config.local_rank) diff --git a/python/graphstorm/run/gsgnn_np/np_infer_gnn.py b/python/graphstorm/run/gsgnn_np/np_infer_gnn.py index 4d84a2ebda..22ee8b7f2c 100644 --- a/python/graphstorm/run/gsgnn_np/np_infer_gnn.py +++ b/python/graphstorm/run/gsgnn_np/np_infer_gnn.py @@ -43,8 +43,9 @@ def main(config_args): config.verify_arguments(False) use_wg_feats = use_wholegraph(config.part_config) + use_wg_embed = config.use_wholegraph_sparse_emb or config.use_wholegraph_embed gs.initialize(ip_config=config.ip_config, backend=config.backend, - use_wholegraph=config.use_wholegraph_sparse_emb or use_wg_feats) + use_wholegraph=use_wg_embed or use_wg_feats) device = setup_device(config.local_rank) infer_data = GSgnnNodeInferData(config.graph_name, diff --git a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh index 2e141ae149..a9f8f67667 100644 --- a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh @@ -249,6 +249,32 @@ error_and_exit $? rm /tmp/train_log.txt +echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: BERT nodes: movie, user inference: mini-batch save model save emb node, use wholegraph for cache_lm_embed" +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --save-model-path /data/gsgnn_nc_ml_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_nc_ml_text/emb/ --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --preserve-input True --use-wholegraph-embed true + +error_and_exit $? + +cnt=$(ls -l /data/gsgnn_nc_ml_text/ | grep epoch | wc -l) +if test $cnt != 1 +then + echo "The number of save models $cnt is not equal to the specified topk 1" + exit -1 +fi + +best_epoch=$(grep "successfully save the model to" /tmp/train_log.txt | tail -1 | tr -d '\n' | tail -c 1) +echo "The best model is saved in epoch $best_epoch" + +rm /tmp/train_log.txt + +echo "*************use wholegraph cached LM embeddings" +# Run the model training again and this time it should load the BERT embeddings saved +# in the previous run. +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --use-wholegraph-embed true + +error_and_exit $? + +rm /tmp/train_log.txt + echo "**************dataset: Movielens, do inference on saved model, RGCN layer: 1, node feat: BERT nodes: movie, user" python3 -m graphstorm.run.gs_node_classification --inference --workspace $GS_HOME/inference_scripts/np_infer/ --num-trainers $NUM_INFERs --num-servers 2 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_text_infer.yaml --use-mini-batch-infer false --save-embed-path /data/gsgnn_nc_ml_text/infer-emb/ --restore-model-path /data/gsgnn_nc_ml_text/epoch-$best_epoch/ --save-prediction-path /data/gsgnn_nc_ml_text/prediction/ --logging-file /tmp/log.txt --logging-level debug --preserve-input True From 3f7b8ce11ebdd34e202f0faaab7b8ad2a1155095 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 19 Feb 2024 15:15:22 -0800 Subject: [PATCH 09/13] Fix lint --- python/graphstorm/model/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/graphstorm/model/utils.py b/python/graphstorm/model/utils.py index 615c960013..24b3a499d0 100644 --- a/python/graphstorm/model/utils.py +++ b/python/graphstorm/model/utils.py @@ -582,7 +582,8 @@ def save_wholegraph_embedding(emb_path, embedding, rank, world_size, fmt="binary The format of the saved embeddings. Currently only support "binary" and "pytorch". """ assert fmt in ["binary", "pytorch"], \ - "Using WholeGraph, the supported formats of the saved embeddings are 'binary' and 'pytorch'." + "Using WholeGraph, the supported formats of the saved embeddings " + \ + "are 'binary' and 'pytorch'." os.makedirs(emb_path, exist_ok=True) # [04/16]: Only rank 0 can chmod to let all other ranks to write files. if rank == 0: From fd3ad3931ce94c8e17e4d5f642cc559d0a9c3c53 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Mon, 19 Feb 2024 17:17:40 -0800 Subject: [PATCH 10/13] Fix mgpu CI test --- .../end2end-tests/graphstorm-nc/mgpu_test.sh | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh index a9f8f67667..1e8622e487 100644 --- a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh @@ -249,12 +249,30 @@ error_and_exit $? rm /tmp/train_log.txt +echo "**************dataset: Movielens, do inference on saved model, RGCN layer: 1, node feat: BERT nodes: movie, user" +python3 -m graphstorm.run.gs_node_classification --inference --workspace $GS_HOME/inference_scripts/np_infer/ --num-trainers $NUM_INFERs --num-servers 2 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_text_infer.yaml --use-mini-batch-infer false --save-embed-path /data/gsgnn_nc_ml_text/infer-emb/ --restore-model-path /data/gsgnn_nc_ml_text/epoch-$best_epoch/ --save-prediction-path /data/gsgnn_nc_ml_text/prediction/ --logging-file /tmp/log.txt --logging-level debug --preserve-input True + +error_and_exit $? + +cnt=$(grep "| Test accuracy" /tmp/log.txt | wc -l) +if test $cnt -ne 1 +then + echo "We do test, should have test accuracy" + exit -1 +fi + +rm /tmp/log.txt + +python3 $GS_HOME/tests/end2end-tests/check_np_infer_emb.py --train-embout /data/gsgnn_nc_ml_text/emb/ --infer-embout /data/gsgnn_nc_ml_text/infer-emb/ + +error_and_exit $? + echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: BERT nodes: movie, user inference: mini-batch save model save emb node, use wholegraph for cache_lm_embed" -python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --save-model-path /data/gsgnn_nc_ml_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_nc_ml_text/emb/ --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --preserve-input True --use-wholegraph-embed true +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --save-model-path /data/gsgnn_wg_nc_ml_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_wg_nc_ml_text/emb/ --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --preserve-input True --use-wholegraph-embed true error_and_exit $? -cnt=$(ls -l /data/gsgnn_nc_ml_text/ | grep epoch | wc -l) +cnt=$(ls -l /data/gsgnn_wg_nc_ml_text/ | grep epoch | wc -l) if test $cnt != 1 then echo "The number of save models $cnt is not equal to the specified topk 1" @@ -276,7 +294,7 @@ error_and_exit $? rm /tmp/train_log.txt echo "**************dataset: Movielens, do inference on saved model, RGCN layer: 1, node feat: BERT nodes: movie, user" -python3 -m graphstorm.run.gs_node_classification --inference --workspace $GS_HOME/inference_scripts/np_infer/ --num-trainers $NUM_INFERs --num-servers 2 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_text_infer.yaml --use-mini-batch-infer false --save-embed-path /data/gsgnn_nc_ml_text/infer-emb/ --restore-model-path /data/gsgnn_nc_ml_text/epoch-$best_epoch/ --save-prediction-path /data/gsgnn_nc_ml_text/prediction/ --logging-file /tmp/log.txt --logging-level debug --preserve-input True +python3 -m graphstorm.run.gs_node_classification --inference --workspace $GS_HOME/inference_scripts/np_infer/ --num-trainers $NUM_INFERs --num-servers 2 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_text_infer.yaml --use-mini-batch-infer false --save-embed-path /data/gsgnn_wg_nc_ml_text/infer-emb/ --restore-model-path /data/gsgnn_wg_nc_ml_text/epoch-$best_epoch/ --save-prediction-path /data/gsgnn_wg_nc_ml_text/prediction/ --logging-file /tmp/log.txt --logging-level debug --preserve-input True error_and_exit $? @@ -289,7 +307,7 @@ fi rm /tmp/log.txt -python3 $GS_HOME/tests/end2end-tests/check_np_infer_emb.py --train-embout /data/gsgnn_nc_ml_text/emb/ --infer-embout /data/gsgnn_nc_ml_text/infer-emb/ +python3 $GS_HOME/tests/end2end-tests/check_np_infer_emb.py --train-embout /data/gsgnn_wg_nc_ml_text/emb/ --infer-embout /data/gsgnn_wg_nc_ml_text/infer-emb/ error_and_exit $? From 611f366f7b868d6a7c1432337099f0406cf0dafc Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Tue, 20 Feb 2024 10:52:23 -0800 Subject: [PATCH 11/13] Address comment for e2e tests --- python/graphstorm/model/utils.py | 7 +- .../end2end-tests/graphstorm-nc/mgpu_test.sh | 92 ++++++++++--------- 2 files changed, 54 insertions(+), 45 deletions(-) diff --git a/python/graphstorm/model/utils.py b/python/graphstorm/model/utils.py index 24b3a499d0..0969c95f5d 100644 --- a/python/graphstorm/model/utils.py +++ b/python/graphstorm/model/utils.py @@ -650,7 +650,12 @@ def load_wholegraph_embedding(emb_path, name): ------- WholeGraphDistTensor : the loaded embeddings in WholeGraph. """ - with open(os.path.join(emb_path, "emb_info.json"), 'r', encoding='utf-8') as f: + file_path = os.path.join(emb_path, "emb_info.json") + assert os.path.exists(file_path), \ + f"Embedding JSON file: {file_path} not found. " + \ + "This file is needed for storing embedding with WholeGraph. It's generated when " + \ + "you save embeddings with '--use-wholegraph-embed' flag." + with open(file_path, 'r', encoding='utf-8') as f: emb_info = json.load(f) emb_fmt = emb_info['format'] diff --git a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh index 1e8622e487..de622cbf69 100644 --- a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh @@ -267,50 +267,6 @@ python3 $GS_HOME/tests/end2end-tests/check_np_infer_emb.py --train-embout /data/ error_and_exit $? -echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: BERT nodes: movie, user inference: mini-batch save model save emb node, use wholegraph for cache_lm_embed" -python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --save-model-path /data/gsgnn_wg_nc_ml_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_wg_nc_ml_text/emb/ --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --preserve-input True --use-wholegraph-embed true - -error_and_exit $? - -cnt=$(ls -l /data/gsgnn_wg_nc_ml_text/ | grep epoch | wc -l) -if test $cnt != 1 -then - echo "The number of save models $cnt is not equal to the specified topk 1" - exit -1 -fi - -best_epoch=$(grep "successfully save the model to" /tmp/train_log.txt | tail -1 | tr -d '\n' | tail -c 1) -echo "The best model is saved in epoch $best_epoch" - -rm /tmp/train_log.txt - -echo "*************use wholegraph cached LM embeddings" -# Run the model training again and this time it should load the BERT embeddings saved -# in the previous run. -python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --use-wholegraph-embed true - -error_and_exit $? - -rm /tmp/train_log.txt - -echo "**************dataset: Movielens, do inference on saved model, RGCN layer: 1, node feat: BERT nodes: movie, user" -python3 -m graphstorm.run.gs_node_classification --inference --workspace $GS_HOME/inference_scripts/np_infer/ --num-trainers $NUM_INFERs --num-servers 2 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_text_infer.yaml --use-mini-batch-infer false --save-embed-path /data/gsgnn_wg_nc_ml_text/infer-emb/ --restore-model-path /data/gsgnn_wg_nc_ml_text/epoch-$best_epoch/ --save-prediction-path /data/gsgnn_wg_nc_ml_text/prediction/ --logging-file /tmp/log.txt --logging-level debug --preserve-input True - -error_and_exit $? - -cnt=$(grep "| Test accuracy" /tmp/log.txt | wc -l) -if test $cnt -ne 1 -then - echo "We do test, should have test accuracy" - exit -1 -fi - -rm /tmp/log.txt - -python3 $GS_HOME/tests/end2end-tests/check_np_infer_emb.py --train-embout /data/gsgnn_wg_nc_ml_text/emb/ --infer-embout /data/gsgnn_wg_nc_ml_text/infer-emb/ - -error_and_exit $? - # Run the model training again and this time it should load the BERT embeddings saved # in the previous run. python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_movie_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_movie_utext.yaml --save-model-path /data/gsgnn_nc_ml_movie_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_nc_ml_text/emb/ --num-epochs 1 --construct-feat-ntype user --preserve-input True @@ -557,4 +513,52 @@ python3 $GS_HOME/tests/end2end-tests/check_np_infer_emb.py --train-embout /data/ error_and_exit $? rm -fr /data/gsgnn_wg_nc_ml/ + +echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: BERT nodes: movie, user inference: mini-batch save model save emb node, use wholegraph for cache_lm_embed" +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --save-model-path /data/gsgnn_wg_nc_ml_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_wg_nc_ml_text/emb/ --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --preserve-input True --use-wholegraph-embed true --lm-train-nodes 0 + +error_and_exit $? + +cnt=$(find /data/movielen_100k_lm_encoder_train_val_1p_4t/ -type f -name "wg-embed*" | wc -l) +expected=$(( 2 * $NUM_TRAINERS )) +if test $cnt != $expected +then + echo "The number of saved wholegraph embeddings $cnt is not equal to the number of $NUM_TRAINERS * 2" + exit -1 +fi + +file_path=$(find /data/movielen_100k_lm_encoder_train_val_1p_4t/ -type f -name "emb_info.json" -print -quit) +if [ -n "$file_path" ]; then + if grep -q "wholegraph" "$file_path"; then + else + echo "The emb_info.json file at $file_path does not contain wholegraph as its format name." + exit -1 + fi +else + echo "The emb_info.json file is not found" + exit -1 +fi + +cnt=$(ls -l /data/gsgnn_wg_nc_ml_text/ | grep epoch | wc -l) +if test $cnt != 1 +then + echo "The number of save models $cnt is not equal to the specified topk 1" + exit -1 +fi + +best_epoch=$(grep "successfully save the model to" /tmp/train_log.txt | tail -1 | tr -d '\n' | tail -c 1) +echo "The best model is saved in epoch $best_epoch" + +rm /tmp/train_log.txt + +echo "*************use wholegraph cached LM embeddings" +# Run the model training again and this time it should load the BERT embeddings saved +# in the previous run. +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --use-wholegraph-embed true --lm-train-nodes 0 + +error_and_exit $? + +rm /tmp/train_log.txt +rm -fr /data/gsgnn_wg_nc_ml_text/ + rm -fr /tmp/* From 0bd5a5c30632b57016a9223774401514fade6281 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Tue, 20 Feb 2024 11:22:24 -0800 Subject: [PATCH 12/13] Minor fix of syntex --- tests/end2end-tests/graphstorm-nc/mgpu_test.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh index de622cbf69..9bb378b5ea 100644 --- a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh @@ -529,8 +529,7 @@ fi file_path=$(find /data/movielen_100k_lm_encoder_train_val_1p_4t/ -type f -name "emb_info.json" -print -quit) if [ -n "$file_path" ]; then - if grep -q "wholegraph" "$file_path"; then - else + if ! grep -q "wholegraph" "$file_path"; then echo "The emb_info.json file at $file_path does not contain wholegraph as its format name." exit -1 fi From 188423e4f04729a2509f4c7d57cd009fdca0f759 Mon Sep 17 00:00:00 2001 From: Chang Liu Date: Tue, 20 Feb 2024 12:05:30 -0800 Subject: [PATCH 13/13] Use NCCL backend for WG stability --- tests/end2end-tests/graphstorm-nc/mgpu_test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh index 9bb378b5ea..0854440e35 100644 --- a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh @@ -515,7 +515,7 @@ error_and_exit $? rm -fr /data/gsgnn_wg_nc_ml/ echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: BERT nodes: movie, user inference: mini-batch save model save emb node, use wholegraph for cache_lm_embed" -python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --save-model-path /data/gsgnn_wg_nc_ml_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_wg_nc_ml_text/emb/ --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --preserve-input True --use-wholegraph-embed true --lm-train-nodes 0 +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --save-model-path /data/gsgnn_wg_nc_ml_text/ --topk-model-to-save 1 --save-embed-path /data/gsgnn_wg_nc_ml_text/emb/ --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --preserve-input True --use-wholegraph-embed true --lm-train-nodes 0 --backend nccl error_and_exit $? @@ -553,7 +553,7 @@ rm /tmp/train_log.txt echo "*************use wholegraph cached LM embeddings" # Run the model training again and this time it should load the BERT embeddings saved # in the previous run. -python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --use-wholegraph-embed true --lm-train-nodes 0 +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_lm_encoder_train_val_1p_4t/movie-lens-100k-text.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc_utext.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --cache-lm-embed true --use-wholegraph-embed true --lm-train-nodes 0 --backend nccl error_and_exit $?