Skip to content

Commit

Permalink
Merge branch 'hard-negative' into hard-test-negative
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiang Song committed Dec 23, 2023
2 parents 440b8c0 + 326ff4a commit 5fff5fb
Show file tree
Hide file tree
Showing 11 changed files with 842 additions and 92 deletions.
46 changes: 46 additions & 0 deletions docs/source/advanced/advanced-usages.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,52 @@ To be more specific, these dataloaders will do neighbor sampling regardless of a

With DGL 1.0.4, ``fast_localuniform`` dataloader can speedup 2.4X over ``localuniform`` dataloader on training a 2 layer RGCN on MAG dataset on four g5.48x instances.

Hard Negative sampling in Link Prediction Training
-------------------------------------
GraphStorm provides support for users to define hard negative edges for a positive edge during Link Prediction Training.
Currently, hard negative edges are constructed by replacing the destination nodes of edges with pre-defined hard negatives.
For example, given an edge (``src_pos``, ``dst_pos``) and its hard negative destination nodes ``hard_0`` and ``hand_1``, GraphStorm will construct two hard negative edges, i.e., (``src_pos``, ``hard_0``) and (``src_pos``, ``hand_1``).

The hard negatives are stored as edge features of the target edge type.
Users can provide the hard negatives for each edge type through ``train_etypes_negative_dstnode`` in the training config yaml.
For example, the following yaml block defines the hard negatives for edge type (``src_type``,``rel_type0``,``dst_type``) as the edge feature ``negative_nid_field_0`` and the hard negatives for edge type (``src_type``,``rel_type1``,``dst_type``) as the edge feature ``negative_nid_field_1``.

.. code-block:: yaml
train_etypes_negative_dstnode:
- src_type,rel_type0,dst_type:negative_nid_field_0
- src_type,rel_type1,dst_type:negative_nid_field_1
Users can also define the number of hard negatives to sample for each edge type during training though ``num_train_hard_negatives`` in the training config yaml.
For example, the following yaml block defines the number of hard negatives for edge type (``src_type``,``rel_type0``,``dst_type``) is 5 and the number of hard negatives for edge type (``src_type``,``rel_type1``,``dst_type``) is 10.

.. code-block:: yaml
num_train_hard_negatives:
- src_type,rel_type0,dst_type:5
- src_type,rel_type1,dst_type:10
Hard negative sampling can be used together with any link prediction negative sampler, such as ``uniform``, ``joint``, ``inbatch_joint``, etc.
By default, GraphStorm will sample hard negatives first to fulfill the requirement of ``num_train_hard_negatives`` and then sample random negatives to fulfill the requirement of ``num_negative_edges``.
In general, GraphStorm covers following cases:

- **Case 1** ``num_train_hard_negatives`` is larger or equal to ``num_negative_edges``. GraphStorm will only sample hard negative nodes.
- **Case 2** ``num_train_hard_negatives`` is smaller than ``num_negative_edges``. GraphStorm will randomly sample ``num_train_hard_negatives`` hard negative nodes from the hard negative set and then randomly sample ``num_negative_edges - num_train_hard_negatives`` negative nodes.
- **Case 3** GraphStorm supports cases when some edges do not have enough hard negatives provided by users. For example, the expected ``num_train_hard_negatives`` is 10, but an edge only have 5 hard negatives. In certain cases, GraphStorm will use all the hard negatives first and then randomly sample negative nodes to fulfill the requirement of ``num_train_hard_negatives``. Then GraphStorm will go back to **Case 1** or **Case 2**.

** Preparing graph data for hard negative sampling **

The gconstruct pipeline of GraphStorm provides support to load hard negative data from raw input.
Hard destination negatives can be defined through ``edge_dst_hard_negative`` transformation.
The ``feature_col`` field of ``edge_dst_hard_negative`` must stores the raw node ids of hard destination nodes.
GraphStorm accepts two types of hard negative inputs:

- **An array of strings or integers** When the input format is ``Parquet``, the ``feature_col`` can store string or integer arrays. In this case, each row stores a string/integer array representing the hard negative node ids of the corresponding edge. For example, the ``feature_col`` can be a 2D string array, like ``[["e0_hard_0", "e0_hard_1"],["e1_hard_0"], ..., ["en_hard_0", "en_hard_1"]]`` or a 2D integer array (for integer node ids) like ``[[10,2],[3],...[4,12]]``. It is not required for each row to have the same dimension size. GraphStorm will automatically handle the case when some edges do not have enough pre-defined hard negatives.

- **A single string** The ``feature_col`` stores strings instead of string arrays. (When the input format is ``Parquet`` or ``CSV``) In this case, a ``separator`` must be provided to split the strings into node ids. The ``feature_col`` will be a 1D string list, for example ``["e0_hard_0;e0_hard_1", "e1_hard_1", ..., "en_hard_0;en_hard_1"]``. The string length, i.e., number of hard negatives, can vary from row to row. GraphStorm will automatically handle the case when some edges do not have enough hard negatives.

GraphStorm will automatically translate the Raw Node IDs of hard negatives into Partition Node IDs in a DistDGL graph.

Multiple Target Node Types Training
-------------------------------------

Expand Down
42 changes: 31 additions & 11 deletions python/graphstorm/gconstruct/construct_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
from .utils import (multiprocessing_data_read,
update_two_phase_feat_ops, ExtMemArrayMerger,
partition_graph, ExtMemArrayWrapper)
from .utils import (get_hard_edge_negs_feats,
shuffle_hard_nids)

def prepare_node_data(in_file, feat_ops, read_file):
""" Prepare node data information for data transformation.
Expand Down Expand Up @@ -297,13 +299,13 @@ def process_node_data(process_confs, arr_merger, remap_id,
assert 'files' in process_conf, \
"'files' must be defined for a node type"
in_files = get_in_files(process_conf['files'])
(feat_ops, two_phase_feat_ops, after_merge_feat_ops) = \
parse_feat_ops(process_conf['features']) \
if 'features' in process_conf else (None, [], {})
label_ops = parse_label_ops(process_conf, is_node=True) \
if 'labels' in process_conf else None
assert 'format' in process_conf, \
"'format' must be defined for a node type"
(feat_ops, two_phase_feat_ops, after_merge_feat_ops, _) = \
parse_feat_ops(process_conf['features'], process_conf['format']['name']) \
if 'features' in process_conf else (None, [], {}, [])
label_ops = parse_label_ops(process_conf, is_node=True) \
if 'labels' in process_conf else None

# If it requires multiprocessing, we need to read data to memory.
node_id_col = process_conf['node_id_col'] if 'node_id_col' in process_conf else None
Expand Down Expand Up @@ -460,7 +462,14 @@ def process_edge_data(process_confs, node_id_map, arr_merger,
Returns
-------
dict: edge features.
edges: dict
Edges.
edge_data: dict
Edge features.
label_stats: dict
Edge label statistics.
hard_edge_neg_ops: list
Hard edge negative ops.
"""
edges = {}
edge_data = {}
Expand All @@ -475,9 +484,9 @@ def process_edge_data(process_confs, node_id_map, arr_merger,
in_files = get_in_files(process_conf['files'])
assert 'format' in process_conf, \
"'format' is not defined for an edge type."
(feat_ops, two_phase_feat_ops, after_merge_feat_ops) = \
parse_feat_ops(process_conf['features']) \
if 'features' in process_conf else (None, [], {})
(feat_ops, two_phase_feat_ops, after_merge_feat_ops, hard_edge_neg_ops) = \
parse_feat_ops(process_conf['features'], process_conf['format']['name'])\
if 'features' in process_conf else (None, [], {}, [])
label_ops = parse_label_ops(process_conf, is_node=False) \
if 'labels' in process_conf else None

Expand All @@ -487,6 +496,11 @@ def process_edge_data(process_confs, node_id_map, arr_merger,
id_map = {edge_type[0]: node_id_map[edge_type[0]],
edge_type[2]: node_id_map[edge_type[2]]}

# For edge hard negative transformation ops, more information is needed
for op in hard_edge_neg_ops:
op.set_target_etype(edge_type)
op.set_id_maps(id_map)

multiprocessing = do_multiprocess_transform(process_conf,
feat_ops,
label_ops,
Expand Down Expand Up @@ -580,7 +594,7 @@ def process_edge_data(process_confs, node_id_map, arr_merger,
f"does not match the number of edges of {edge_type}. " \
f"Expecting {len(edges[edge_type][0])}, but get {len(efeats)}"

return (edges, edge_data, label_stats)
return (edges, edge_data, label_stats, hard_edge_neg_ops)

def verify_confs(confs):
""" Verify the configuration of the input data.
Expand Down Expand Up @@ -684,7 +698,7 @@ def process_graph(args):
args.remap_node_id, ext_mem_workspace,
num_processes=num_processes_for_nodes)
sys_tracker.check('Process the node data')
edges, edge_data, edge_label_stats = \
edges, edge_data, edge_label_stats, hard_edge_neg_ops = \
process_edge_data(process_confs['edges'], node_id_map,
convert2ext_mem, ext_mem_workspace,
num_processes=num_processes_for_edges,
Expand Down Expand Up @@ -734,6 +748,12 @@ def process_graph(args):
save_mapping=True, # always save mapping
part_method=args.part_method)

# There are hard negatives, we need to do NID remapping
if len(hard_edge_neg_ops) > 0:
# we need to load each partition file to remap the node ids.
hard_edge_neg_feats = get_hard_edge_negs_feats(hard_edge_neg_ops)
shuffle_hard_nids(args.output_dir, args.num_parts, hard_edge_neg_feats)

if "DGL" in output_format:
for ntype in node_data:
for name, ndata in node_data[ntype].items():
Expand Down
14 changes: 12 additions & 2 deletions python/graphstorm/gconstruct/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import glob
import json
import os
import logging

import pyarrow.parquet as pq
import pyarrow as pa
Expand Down Expand Up @@ -215,8 +216,17 @@ def read_data_parquet(data_file, data_fields=None):
# save them as objects in parquet. We need to merge them
# together and store them in a tensor.
if d.dtype.hasobject and isinstance(d[0], np.ndarray):
d = [d[i] for i in range(len(d))]
d = np.stack(d)
new_d = [d[i] for i in range(len(d))]
try:
# if each row has the same shape
# merge them together
d = np.stack(new_d)
except Exception: # pylint: disable=broad-exception-caught
# keep it as an ndarry of ndarrys
# It may happen when loading hard negatives for hard negative transformation.
logging.warning("The %s column of parquet file %s has " \
"variable length of feature, it is only suported when " \
"transformation is a hard negative transformation", key, data_file)
data[key] = d
return data

Expand Down
10 changes: 10 additions & 0 deletions python/graphstorm/gconstruct/id_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ def __init__(self, ids):
def __len__(self):
return len(self._ids)

@property
def map_key_dtype(self):
""" Return the data type of map keys.
"""
for id_ in self._ids:
if isinstance(id_, np.ndarray):
return id_.dtype
else:
return type(id_)

def map_id(self, ids):
""" Map the input IDs to the new IDs.
Expand Down
Loading

0 comments on commit 5fff5fb

Please sign in to comment.