Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing&GSPartition] Support Hard Negative #1080

Merged
merged 56 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
84babde
update gconstruct converter
Oct 29, 2024
ba88b9f
gsprocessing part
jalencato Oct 30, 2024
613814f
hard negative config
jalencato Oct 30, 2024
53ea800
add code file
jalencato Oct 30, 2024
9e9d35e
finish gsprocessing related development
jalencato Oct 31, 2024
37e30e6
add blank
jalencato Oct 31, 2024
8b702e5
tab
jalencato Oct 31, 2024
a50b657
hard negative for gspartition
jalencato Nov 1, 2024
8def740
add doc string
jalencato Nov 4, 2024
644ba29
add gsprocessing part test
jalencato Nov 4, 2024
1f085da
add test for gspartition part
jalencato Nov 4, 2024
b7bcbaa
add
jalencato Nov 4, 2024
b67836d
lint
jalencato Nov 5, 2024
10d29fb
lint
jalencato Nov 5, 2024
3fea39b
change
jalencato Nov 5, 2024
4ad0e97
black lint
jalencato Nov 5, 2024
a4dfb61
add doc to hard negative
jalencato Nov 5, 2024
8dfdf05
add doc
jalencato Nov 5, 2024
52bef41
reset test
jalencato Nov 5, 2024
4f30c7d
add test
jalencato Nov 5, 2024
308f833
simplify test
jalencato Nov 5, 2024
b6876d9
Update gconstruct_converter.py
jalencato Nov 5, 2024
bf1b0eb
Update hard_negative_configs.py
jalencato Nov 5, 2024
6d8ed96
Update dist_feature_transformer.py
jalencato Nov 5, 2024
6eef513
Update dist_hard_negative_transformation.py
jalencato Nov 5, 2024
7079f2c
add feature transformation
jalencato Nov 5, 2024
e1d6cb9
Merge branch 'main' into gsprocessing-hard-negative
jalencato Nov 5, 2024
8c3c79e
add dot
jalencato Nov 6, 2024
a9dd308
hard negative config renaming
jalencato Nov 6, 2024
cd319fa
Update constants.py
jalencato Nov 6, 2024
4f25cb9
add constant
jalencato Nov 6, 2024
2c64666
Merge branch 'main' into gsprocessing-hard-negative
jalencato Nov 7, 2024
ff7c470
Apply suggestions from code review
jalencato Nov 11, 2024
dbb8c0f
Apply suggestions from code review
jalencato Nov 11, 2024
ad1c786
apply comment for GSProcessing
jalencato Nov 11, 2024
2e47f2d
Apply suggestions from code review
jalencato Nov 11, 2024
e631d2c
apply comments for gspartition
jalencato Nov 11, 2024
0e7c471
Merge branch 'main' into gsprocessing-hard-negative
jalencato Nov 11, 2024
ca46211
apply comment
jalencato Nov 11, 2024
eff82aa
roll back
jalencato Nov 11, 2024
9dd9ff5
apply comment
jalencato Nov 11, 2024
1ab7f6d
fix test
jalencato Nov 11, 2024
f93dff2
check existense for launch_arguments.json
jalencato Nov 11, 2024
b3760d8
check existense for launch_arguments.json
jalencato Nov 11, 2024
32f3412
change gsprocessing part
jalencato Nov 12, 2024
c61bdc9
comment
jalencato Nov 12, 2024
90fcd75
hard negative processing
jalencato Nov 12, 2024
3242953
lint
jalencato Nov 12, 2024
0dd32f9
fix comment
jalencato Nov 12, 2024
dd6cfe1
Merge branch 'main' into gsprocessing-hard-negative
jalencato Nov 12, 2024
7929f05
change maximum size
jalencato Nov 12, 2024
d2e03db
black
jalencato Nov 12, 2024
d4da2ca
remove size
jalencato Nov 12, 2024
35d4cbd
test
jalencato Nov 13, 2024
11007eb
Merge branch 'main' into gsprocessing-hard-negative
jalencato Nov 14, 2024
dc71cbb
Merge branch 'main' into gsprocessing-hard-negative
jalencato Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/source/advanced/link-prediction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ impact is negligible.

With DGL 1.0.4, ``fast_localuniform`` dataloader can speedup 2.4X over ``localuniform`` dataloader on training a 2 layer RGCN on MAG dataset on four g5.48x instances.

.. _hard_negative_sampling:

Hard Negative sampling
-----------------------
GraphStorm provides support for users to define hard negative edges for a positive edge during Link Prediction training.
Expand Down Expand Up @@ -272,10 +274,10 @@ In general, GraphStorm covers following cases:
Preparing graph data for hard negative sampling
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The gconstruct pipeline of GraphStorm provides support to load hard negative data from raw input.
Both single machine and distributed graph construction pipeline of GraphStorm provide support to load hard negative data from raw input.
Hard destination negatives can be defined through ``edge_dst_hard_negative`` transformation.
The ``feature_col`` field of ``edge_dst_hard_negative`` must stores the raw node ids of hard destination nodes.
The follwing example shows how to define a hard negative feature for edges with the relation ``(node1, relation1, node1)``:
The following example shows how to define a hard negative feature for edges with the relation ``(node1, relation1, node1)``:

.. code-block:: json

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,13 @@ arguments.
You can use a length greater than the dataset's longest sentence; or for a safe value choose 128. Make sure to check
the model's max supported length when setting this value.

- ``edge_dst_hard_negative``

- Encodes a hard negative edge feature for link prediction. For detail information for hard negative support, please refer to :ref:`hard_negative_sampling`.
- ``kwargs``:
- ``separator`` (String, optional): The separator is used to
split multiple values in an input string for data in CSV files e.g. ``p0;s1``. If it is not provided, then the whole value
will be treated as a single string.

.. _gsprocessing-multitask-ref:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def _convert_feature(feats: list[Mapping[str, Any]]) -> list[dict]:
"hf_model": gconstruct_transform_dict["bert_model"],
"max_seq_length": gconstruct_transform_dict["max_seq_length"],
}
elif gconstruct_transform_dict["name"] == "edge_dst_hard_negative":
gsp_transformation_dict["name"] = "edge_dst_hard_negative"
if "separator" in gconstruct_transform_dict:
gsp_transformation_dict["kwargs"] = {
"separator": gconstruct_transform_dict["separator"],
}
else:
raise ValueError(
"Unsupported GConstruct transformation name: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from .categorical_configs import MultiCategoricalFeatureConfig
from .hf_configs import HFConfig
from .hard_negative_configs import HardEdgeNegativeConfig
from .data_config_base import DataStorageConfig


Expand Down Expand Up @@ -71,6 +72,8 @@ def parse_feat_config(feature_dict: Dict) -> FeatureConfig:
return MultiCategoricalFeatureConfig(feature_dict)
elif transformation_name == "huggingface":
return HFConfig(feature_dict)
elif transformation_name == "edge_dst_hard_negative":
return HardEdgeNegativeConfig(feature_dict)
else:
raise RuntimeError(f"Unknown transformation name: '{transformation_name}'")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Mapping

from .feature_config_base import FeatureConfig


class HardEdgeNegativeConfig(FeatureConfig):
"""Feature configuration for hard negative feature. Now only support link prediction.

Supported kwargs
----------------
separator: str, optional
The separator for string input value. Only required when input value type is string.
"""

def __init__(self, config: Mapping):
super().__init__(config)
self.separator = self._transformation_kwargs.get("separator", None)

self._sanity_check()
8 changes: 8 additions & 0 deletions graphstorm-processing/graphstorm_processing/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
HUGGINGFACE_TOKENIZE = "tokenize_hf"
HUGGINGFACE_EMB = "embedding_hf"

################# Hard Negative transformations ################
ORDER_INDEX = "hard_negative_order_id"
EXPLODE_HARD_NEGATIVE_VALUE = "hard_negative_exploded_single_value"

################# Node Mapping ################
NODE_MAPPING_STR = "orig"
jalencato marked this conversation as resolved.
Show resolved Hide resolved
NODE_MAPPING_INT = "new"


################# Supported execution envs ##############
class ExecutionEnv(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DistCategoryTransformation,
DistMultiCategoryTransformation,
DistHFTransformation,
DistHardEdgeNegativeTransformation,
)


Expand Down Expand Up @@ -71,6 +72,10 @@ def __init__(
self.transformation = DistMultiCategoryTransformation(**default_kwargs, **args_dict)
elif feat_type == "huggingface":
self.transformation = DistHFTransformation(**default_kwargs, **args_dict)
elif feat_type == "edge_dst_hard_negative":
self.transformation = DistHardEdgeNegativeTransformation(
**default_kwargs, **args_dict, spark=spark
)
else:
raise NotImplementedError(
f"Feature {feat_name} has type: {feat_type} that is not supported"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
)
from .dist_bucket_numerical_transformation import DistBucketNumericalTransformation
from .dist_hf_transformation import DistHFTransformation
from .dist_hard_negative_transformation import DistHardEdgeNegativeTransformation
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Sequence
from pyspark.sql.functions import split, col
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql import DataFrame, functions as F, SparkSession

from graphstorm_processing.constants import (
NODE_MAPPING_STR,
NODE_MAPPING_INT,
ORDER_INDEX,
EXPLODE_HARD_NEGATIVE_VALUE,
)

from .base_dist_transformation import DistributedTransformation


class DistHardEdgeNegativeTransformation(DistributedTransformation):
"""Transformation to apply hard negative transformation.

Parameters
----------
cols : Sequence[str]
List of column names to apply hard negative transformation to.
spark: SparkSession
The spark session.
hard_node_mapping_dict: dict
The mapping dictionary contain mapping file directory and edge type.
jalencato marked this conversation as resolved.
Show resolved Hide resolved
{
"edge_type": str
Edge type to apply hard negative transformation.
"mapping_path": str
Path to the raw node mapping.
"format_name": str
Parquet.
}
separator: str, optional
The separator for string input value. Only required when input value type is string.
"""

def __init__(
self,
cols: Sequence[str],
spark: SparkSession,
hard_node_mapping_dict: dict,
separator: str = "",
) -> None:
super().__init__(cols, spark)
self.cols = cols
assert len(self.cols) == 1, "Hard Negative Transformation only supports single column"
self.separator = separator
self.hard_node_mapping_dict = hard_node_mapping_dict
assert self.hard_node_mapping_dict, "edge mapping dict cannot be None for hard negative "

def apply(self, input_df: DataFrame) -> DataFrame:
assert self.spark
input_col = self.cols[0]
column_type = input_df.schema[input_col].dataType
if isinstance(column_type, StringType):
transformed_df = input_df.withColumn(input_col, split(col(input_col), self.separator))
else:
transformed_df = input_df
# Edge type should be (src_ntype:relation_type:dst_ntype)
# Only support hard negative for destination nodes. Get the node type of destination nodes.
# TODO: support hard negative for source nodes.
_, _, dst_type = self.hard_node_mapping_dict["edge_type"].split(":")
mapping_prefix = self.hard_node_mapping_dict["mapping_path"]
format_name = self.hard_node_mapping_dict["format_name"]
hard_negative_node_mapping = self.spark.read.parquet(
f"{mapping_prefix}{dst_type}/{format_name}/"
)
# The maximum number of negatives in the input feature column
max_size = (
jalencato marked this conversation as resolved.
Show resolved Hide resolved
transformed_df.select(F.size(F.col(input_col)).alias(f"{input_col}_size"))
.agg(F.max(f"{input_col}_size"))
.collect()[0][0]
)

# TODO: Use panda series to possibly improve the efficiency
# Explode the original list and join node id mapping dataframe
transformed_df = transformed_df.withColumn(ORDER_INDEX, F.monotonically_increasing_id())
# Could result in extremely large DFs in num_nodes * avg(len_of_negatives) rows
transformed_df = transformed_df.withColumn(
EXPLODE_HARD_NEGATIVE_VALUE, F.explode(F.col(input_col))
jalencato marked this conversation as resolved.
Show resolved Hide resolved
)
transformed_df = transformed_df.join(
hard_negative_node_mapping,
transformed_df[EXPLODE_HARD_NEGATIVE_VALUE]
== hard_negative_node_mapping[NODE_MAPPING_STR],
"inner",
).select(NODE_MAPPING_INT, ORDER_INDEX)
transformed_df = transformed_df.groupBy(ORDER_INDEX).agg(
F.collect_list(NODE_MAPPING_INT).alias(input_col)
)

# Extend the feature to the same length as the maximum length of the feature column
def pad_mapped_values(hard_neg_list):
if len(hard_neg_list) < max_size:
hard_neg_list.extend([-1] * (max_size - len(hard_neg_list)))
return hard_neg_list

pad_value_udf = F.udf(pad_mapped_values, ArrayType(IntegerType()))
# Make sure it keeps the original order
transformed_df = transformed_df.orderBy(ORDER_INDEX)
transformed_df = transformed_df.select(pad_value_udf(F.col(input_col)).alias(input_col))

return transformed_df

@staticmethod
def get_transformation_name() -> str:
return "DistHardEdgeNegativeTransformation"
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,15 @@ def _process_edge_features(
.get(edge_type, {})
.get(feat_conf.feat_name, {})
)
# Hard Negative Transformation use case, but should be able to be reused
jalencato marked this conversation as resolved.
Show resolved Hide resolved
if feat_conf.feat_type == "edge_dst_hard_negative":
hard_node_mapping_dict = {
"edge_type": edge_type,
"mapping_path": f"{self.output_prefix}/raw_id_mappings/",
"format_name": FORMAT_NAME,
}
feat_conf.transformation_kwargs["hard_node_mapping_dict"] = hard_node_mapping_dict

transformer = DistFeatureTransformer(feat_conf, self.spark, json_representation)

if json_representation:
Expand Down
16 changes: 14 additions & 2 deletions graphstorm-processing/tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,14 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
"files": ["/tmp/acm_raw/edges/author_writing_paper.parquet"],
"source_id_col": "~from",
"dest_id_col": "~to",
"features": [{"feature_col": ["author"], "feature_name": "feat"}],
"features": [
{"feature_col": ["author"], "feature_name": "feat"},
{
"feature_col": ["author"],
"feature_name": "hard_negative",
"transform": {"name": "edge_dst_hard_negative", "separator": ";"},
},
],
"labels": [
{
"label_col": "edge_col",
Expand Down Expand Up @@ -505,7 +512,12 @@ def test_convert_gsprocessing(converter: GConstructConfigConverter):
assert edges_output["dest"] == {"column": "~to", "type": "paper"}
assert edges_output["relation"] == {"type": "writing"}
assert edges_output["features"] == [
{"column": "author", "transformation": {"name": "no-op"}, "name": "feat"}
{"column": "author", "transformation": {"name": "no-op"}, "name": "feat"},
{
"column": "author",
"name": "hard_negative",
"transformation": {"name": "edge_dst_hard_negative", "kwargs": {"separator": ";"}},
},
]
assert edges_output["labels"] == [
{
Expand Down
Loading
Loading