Skip to content

Commit

Permalink
[GSProcessing&GSPartition] Support Hard Negative (#1080)
Browse files Browse the repository at this point in the history
*Issue #, if available:*

*Description of changes:*

* Add feature support for hard negative for distributed graph
construction pipeline. For partition stage, users does not need to do
any additional step for hard negative feature transformation itself.
* Implementation with the way we agreed on with the design doc.
* Create additional steps for distributed partition as we can not fully
reused the previous script because different ways for mapping files. I
create a separate one in the gspartition code repo to easy maintain.
* Tested with 1M nodes and 1M edges.

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: EC2 Default User <[email protected]>
Co-authored-by: xiang song(charlie.song) <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2024
1 parent 5df3ad4 commit c964cc4
Show file tree
Hide file tree
Showing 20 changed files with 777 additions and 20 deletions.
6 changes: 4 additions & 2 deletions docs/source/advanced/link-prediction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ impact is negligible.

With DGL 1.0.4, ``fast_localuniform`` dataloader can speedup 2.4X over ``localuniform`` dataloader on training a 2 layer RGCN on MAG dataset on four g5.48x instances.

.. _hard_negative_sampling:

Hard Negative sampling
-----------------------
GraphStorm provides support for users to define hard negative edges for a positive edge during Link Prediction training.
Expand Down Expand Up @@ -272,10 +274,10 @@ In general, GraphStorm covers following cases:
Preparing graph data for hard negative sampling
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The gconstruct pipeline of GraphStorm provides support to load hard negative data from raw input.
Both single machine and distributed graph construction pipeline of GraphStorm provide support to load hard negative data from raw input.
Hard destination negatives can be defined through ``edge_dst_hard_negative`` transformation.
The ``feature_col`` field of ``edge_dst_hard_negative`` must stores the raw node ids of hard destination nodes.
The follwing example shows how to define a hard negative feature for edges with the relation ``(node1, relation1, node1)``:
The following example shows how to define a hard negative feature for edges with the relation ``(node1, relation1, node1)``:

.. code-block:: json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,13 @@ arguments.
You can use a length greater than the dataset's longest sentence; or for a safe value choose 128. Make sure to check
the model's max supported length when setting this value.

- ``edge_dst_hard_negative``

- Encodes a hard negative edge feature for link prediction. For detail information for hard negative support, please refer to :ref:`hard_negative_sampling`.
- ``kwargs``:
- ``separator`` (String, optional): The separator is used to
split multiple values in an input string for data in CSV files e.g. ``p0;s1``. If it is not provided, then the whole value
will be treated as a single string.

.. _gsprocessing-multitask-ref:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]:
"hf_model": gconstruct_transform_dict["bert_model"],
"max_seq_length": gconstruct_transform_dict["max_seq_length"],
}
elif gconstruct_transform_dict["name"] == "edge_dst_hard_negative":
gsp_transformation_dict["name"] = "edge_dst_hard_negative"
if "separator" in gconstruct_transform_dict:
gsp_transformation_dict["kwargs"] = {
"separator": gconstruct_transform_dict["separator"],
}
else:
raise ValueError(
"Unsupported GConstruct transformation name: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from .categorical_configs import MultiCategoricalFeatureConfig
from .hf_configs import HFConfig
from .hard_negative_configs import HardEdgeNegativeConfig
from .data_config_base import DataStorageConfig


Expand Down Expand Up @@ -71,6 +72,8 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig:
return MultiCategoricalFeatureConfig(feature_dict)
elif transformation_name == "huggingface":
return HFConfig(feature_dict)
elif transformation_name == "edge_dst_hard_negative":
return HardEdgeNegativeConfig(feature_dict)
else:
raise RuntimeError(f"Unknown transformation name: '{transformation_name}'")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Mapping

from .feature_config_base import FeatureConfig


class HardEdgeNegativeConfig(FeatureConfig):
"""Feature configuration for hard negative feature. Now only support link prediction.
Supported kwargs
----------------
separator: str, optional
The separator for string input value. Only required when input value type is string.
"""

def __init__(self, config: Mapping):
super().__init__(config)
self.separator = self._transformation_kwargs.get("separator", None)

self._sanity_check()
8 changes: 8 additions & 0 deletions graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
HUGGINGFACE_TOKENIZE = "tokenize_hf"
HUGGINGFACE_EMB = "embedding_hf"

################# Hard Negative transformations ################
ORDER_INDEX = "hard_negative_order_id"
EXPLODE_HARD_NEGATIVE_VALUE = "hard_negative_exploded_single_value"

################# Node Mapping ################
NODE_MAPPING_STR = "orig"
NODE_MAPPING_INT = "new"


################# Supported execution envs ##############
class ExecutionEnv(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DistCategoryTransformation,
DistMultiCategoryTransformation,
DistHFTransformation,
DistHardEdgeNegativeTransformation,
)


Expand Down Expand Up @@ -71,6 +72,10 @@ def __init__(
self.transformation = DistMultiCategoryTransformation(**default_kwargs, **args_dict)
elif feat_type == "huggingface":
self.transformation = DistHFTransformation(**default_kwargs, **args_dict)
elif feat_type == "edge_dst_hard_negative":
self.transformation = DistHardEdgeNegativeTransformation(
**default_kwargs, **args_dict, spark=spark
)
else:
raise NotImplementedError(
f"Feature {feat_name} has type: {feat_type} that is not supported"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
)
from .dist_bucket_numerical_transformation import DistBucketNumericalTransformation
from .dist_hf_transformation import DistHFTransformation
from .dist_hard_negative_transformation import DistHardEdgeNegativeTransformation
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Sequence
from pyspark.sql.functions import split, col
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql import DataFrame, functions as F, SparkSession

from graphstorm_processing.constants import (
NODE_MAPPING_STR,
NODE_MAPPING_INT,
ORDER_INDEX,
EXPLODE_HARD_NEGATIVE_VALUE,
)

from .base_dist_transformation import DistributedTransformation


class DistHardEdgeNegativeTransformation(DistributedTransformation):
"""Transformation to apply hard negative transformation.
Parameters
----------
cols : Sequence[str]
List of column names to apply hard negative transformation to.
spark: SparkSession
The spark session.
hard_node_mapping_dict: dict
The mapping dictionary contain mapping file directory and edge type.
{
"edge_type": str
Edge type to apply hard negative transformation.
"mapping_path": str
Path to the raw node mapping.
"format_name": str
Parquet.
}
separator: str, optional
The separator for string input value. Only required when input value type is string.
"""

def __init__(
self,
cols: Sequence[str],
spark: SparkSession,
hard_node_mapping_dict: dict,
separator: str = "",
) -> None:
super().__init__(cols, spark)
self.cols = cols
assert len(self.cols) == 1, "Hard Negative Transformation only supports single column"
self.separator = separator
self.hard_node_mapping_dict = hard_node_mapping_dict
assert self.hard_node_mapping_dict, "edge mapping dict cannot be None for hard negative "

def apply(self, input_df: DataFrame) -> DataFrame:
assert self.spark
input_col = self.cols[0]
column_type = input_df.schema[input_col].dataType
if isinstance(column_type, StringType):
transformed_df = input_df.withColumn(input_col, split(col(input_col), self.separator))
else:
transformed_df = input_df
# Edge type should be (src_ntype:relation_type:dst_ntype)
# Only support hard negative for destination nodes. Get the node type of destination nodes.
# TODO: support hard negative for source nodes.
_, _, dst_type = self.hard_node_mapping_dict["edge_type"].split(":")
mapping_prefix = self.hard_node_mapping_dict["mapping_path"]
format_name = self.hard_node_mapping_dict["format_name"]
hard_negative_node_mapping = self.spark.read.parquet(
f"{mapping_prefix}{dst_type}/{format_name}/"
)
# The maximum number of negatives in the input feature column
max_size = (
transformed_df.select(F.size(F.col(input_col)).alias(f"{input_col}_size"))
.agg(F.max(f"{input_col}_size"))
.collect()[0][0]
)

# TODO: Use panda series to possibly improve the efficiency
# Explode the original list and join node id mapping dataframe
transformed_df = transformed_df.withColumn(ORDER_INDEX, F.monotonically_increasing_id())
# Could result in extremely large DFs in num_nodes * avg(len_of_negatives) rows
transformed_df = transformed_df.withColumn(
EXPLODE_HARD_NEGATIVE_VALUE, F.explode(F.col(input_col))
)
transformed_df = transformed_df.join(
hard_negative_node_mapping,
transformed_df[EXPLODE_HARD_NEGATIVE_VALUE]
== hard_negative_node_mapping[NODE_MAPPING_STR],
"inner",
).select(NODE_MAPPING_INT, ORDER_INDEX)
transformed_df = transformed_df.groupBy(ORDER_INDEX).agg(
F.collect_list(NODE_MAPPING_INT).alias(input_col)
)

# Extend the feature to the same length as the maximum length of the feature column
def pad_mapped_values(hard_neg_list):
if len(hard_neg_list) < max_size:
hard_neg_list.extend([-1] * (max_size - len(hard_neg_list)))
return hard_neg_list

pad_value_udf = F.udf(pad_mapped_values, ArrayType(IntegerType()))
# Make sure it keeps the original order
transformed_df = transformed_df.orderBy(ORDER_INDEX)
transformed_df = transformed_df.select(pad_value_udf(F.col(input_col)).alias(input_col))

return transformed_df

@staticmethod
def get_transformation_name() -> str:
return "DistHardEdgeNegativeTransformation"
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,15 @@ def _process_edge_features(
.get(edge_type, {})
.get(feat_conf.feat_name, {})
)
# Hard Negative Transformation use case, but should be able to be reused
if feat_conf.feat_type == "edge_dst_hard_negative":
hard_node_mapping_dict = {
"edge_type": edge_type,
"mapping_path": f"{self.output_prefix}/raw_id_mappings/",
"format_name": FORMAT_NAME,
}
feat_conf.transformation_kwargs["hard_node_mapping_dict"] = hard_node_mapping_dict

transformer = DistFeatureTransformer(feat_conf, self.spark, json_representation)

if json_representation:
Expand Down
16 changes: 14 additions & 2 deletions graphstorm-processing/tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,14 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
"files": ["/tmp/acm_raw/edges/author_writing_paper.parquet"],
"source_id_col": "~from",
"dest_id_col": "~to",
"features": [{"feature_col": ["author"], "feature_name": "feat"}],
"features": [
{"feature_col": ["author"], "feature_name": "feat"},
{
"feature_col": ["author"],
"feature_name": "hard_negative",
"transform": {"name": "edge_dst_hard_negative", "separator": ";"},
},
],
"labels": [
{
"label_col": "edge_col",
Expand Down Expand Up @@ -505,7 +512,12 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
assert edges_output["dest"] == {"column": "~to", "type": "paper"}
assert edges_output["relation"] == {"type": "writing"}
assert edges_output["features"] == [
{"column": "author", "transformation": {"name": "no-op"}, "name": "feat"}
{"column": "author", "transformation": {"name": "no-op"}, "name": "feat"},
{
"column": "author",
"name": "hard_negative",
"transformation": {"name": "edge_dst_hard_negative", "kwargs": {"separator": ";"}},
},
]
assert edges_output["labels"] == [
{
Expand Down
Loading

0 comments on commit c964cc4

Please sign in to comment.