Skip to content

Commit

Permalink
Merge branch 'main' into gsp_custom_split
Browse files Browse the repository at this point in the history
  • Loading branch information
jalencato authored Oct 2, 2024
2 parents 6e2db07 + 2fd1511 commit 0d03e80
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,5 @@ in :ref:`gsp-examining-output`.
Run distributed partitioning and training on Amazon SageMaker
-------------------------------------------------------------

With the data now processed you can follow the
`GraphStorm Amazon SageMaker guide
<https://graphstorm.readthedocs.io/en/latest/scale/sagemaker.html#run-graphstorm-on-sagemaker>`_
With the data now processed you can follow the :ref:`GraphStorm Amazon SageMaker guide<distributed-sagemaker>`
to partition your data and run training on AWS.
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,5 @@ Run distributed partitioning and training on Amazon SageMaker
-------------------------------------------------------------

With the data now processed you can follow the
`GraphStorm Amazon SageMaker guide
<https://graphstorm.readthedocs.io/en/latest/scale/sagemaker.html#run-graphstorm-on-sagemaker>`_
:ref:`GraphStorm Amazon SageMaker guide<distributed-sagemaker>`
to partition your data and run training on AWS.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,5 @@ Run distributed partitioning and training on Amazon SageMaker
-------------------------------------------------------------

With the data now processed you can follow the
`GraphStorm Amazon SageMaker guide
<https://graphstorm.readthedocs.io/en/latest/scale/sagemaker.html#run-graphstorm-on-sagemaker>`_
:ref:`GraphStorm Amazon SageMaker guide<distributed-sagemaker>`
to partition your data and run training on AWS.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ You can clone the GraphStorm repository using
git clone https://github.com/awslabs/graphstorm.git
You can then navigate to the ``graphstorm-processing/docker`` directory
You can then navigate to the ``graphstorm-processing/`` directory
that contains the relevant code:

.. code-block:: bash
cd ./graphstorm/graphstorm-processing/docker
cd ./graphstorm/graphstorm-processing/
Install Docker
--------------
Expand Down
33 changes: 21 additions & 12 deletions graphstorm-processing/graphstorm_processing/distributed_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import tempfile
import time
from collections.abc import Mapping
from typing import Any, Dict
from typing import Any, Dict, Optional

import boto3
import botocore
Expand Down Expand Up @@ -106,8 +106,8 @@ class ExecutorConfig:
The filesystem type, can be LOCAL or S3
add_reverse_edges : bool
Whether to create reverse edges for each edge type.
graph_name: str
The name of the graph being processed
graph_name: str, optional
The name of the graph being processed. If not provided we use part of the input_prefix.
do_repartition: bool
Whether to apply repartitioning to the graph on the Spark leader.
"""
Expand All @@ -121,7 +121,7 @@ class ExecutorConfig:
config_filename: str
filesystem_type: FilesystemType
add_reverse_edges: bool
graph_name: str
graph_name: Optional[str]
do_repartition: bool


Expand All @@ -135,7 +135,7 @@ class GSProcessingArguments:
num_output_files: int
add_reverse_edges: bool
log_level: str
graph_name: str
graph_name: Optional[str]
do_repartition: bool


Expand All @@ -162,7 +162,14 @@ def __init__(
self.filesystem_type = executor_config.filesystem_type
self.execution_env = executor_config.execution_env
self.add_reverse_edges = executor_config.add_reverse_edges
self.graph_name = executor_config.graph_name
# We use the data location as the graph name if a name is not provided
if executor_config.graph_name:
self.graph_name = executor_config.graph_name
else:
derived_name = s3_utils.s3_path_remove_trailing(self.input_prefix).split("/")[-1]
logging.warning("Setting graph name derived from input path: %s", derived_name)
self.graph_name = derived_name
check_graph_name(self.graph_name)
self.repartition_on_leader = executor_config.do_repartition
# Input config dict using GSProcessing schema
self.gsp_config_dict = {}
Expand Down Expand Up @@ -541,11 +548,14 @@ def parse_args() -> argparse.Namespace:
parser.add_argument(
"--graph-name",
type=str,
help="Name for the graph being processed."
"The graph name must adhere to the Python "
"identifier naming rules with the exception "
"that hyphens (-) are permitted and the name "
"can start with numbers",
help=(
"Name for the graph being processed."
"The graph name must adhere to the Python "
"identifier naming rules with the exception "
"that hyphens (-) are permitted and the name "
"can start with numbers. If not provided, we will use the last "
"section of the input prefix path."
),
required=False,
default=None,
)
Expand Down Expand Up @@ -604,7 +614,6 @@ def main():
level=gsprocessing_args.log_level,
format="[GSPROCESSING] %(asctime)s %(levelname)-8s %(message)s",
)
check_graph_name(gsprocessing_args.graph_name)

# Determine execution environment
if os.path.exists("/opt/ml/config/processingjobconfig.json"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ def process_and_write_graph_data(
self.timers["process_edge_data"] = perf_counter() - edges_start_time
metadata_dict["edge_data"] = edge_data_dict
metadata_dict["edges"] = edge_structure_dict
# We use the data location as the graph name, can also take from user?
# TODO: Fix this, take from config?
metadata_dict["graph_name"] = (
self.graph_name if self.graph_name else self.input_prefix.split("/")[-1]
)

# Ensure output dict has the correct order of keys
for edge_type in metadata_dict["edge_type"]:
Expand Down Expand Up @@ -447,6 +442,8 @@ def _initialize_metadata_dict(
metadata_dict["edge_type"] = edge_types
metadata_dict["node_type"] = sorted(node_type_set)

metadata_dict["graph_name"] = self.graph_name

return metadata_dict

def _finalize_graphinfo_dict(self, metadata_dict: Dict) -> Dict:
Expand Down
56 changes: 38 additions & 18 deletions graphstorm-processing/tests/test_dist_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ def user_state_categorical_precomp_file_fixture():
os.remove(precomp_file)


def test_dist_executor_run_with_precomputed(tempdir: str, user_state_categorical_precomp_file):
"""Test run function with local data"""
@pytest.fixture(name="executor_configuration")
def executor_config_fixture(tempdir: str):
"""Create a re-usable ExecutorConfig"""
input_path = os.path.join(_ROOT, "resources/small_heterogeneous_graph")
executor_configuration = ExecutorConfig(
local_config_path=input_path,
Expand All @@ -79,6 +80,15 @@ def test_dist_executor_run_with_precomputed(tempdir: str, user_state_categorical
do_repartition=True,
)

yield executor_configuration


def test_dist_executor_run_with_precomputed(
tempdir: str,
user_state_categorical_precomp_file: str,
executor_configuration: ExecutorConfig,
):
"""Test run function with local data"""
original_precomp_file = user_state_categorical_precomp_file

with open(original_precomp_file, "r", encoding="utf-8") as f:
Expand Down Expand Up @@ -106,23 +116,8 @@ def test_dist_executor_run_with_precomputed(tempdir: str, user_state_categorical
# TODO: Verify other metadata files that verify_integ_test_output doesn't check for


def test_merge_input_and_transform_dicts(tempdir: str):
def test_merge_input_and_transform_dicts(executor_configuration: ExecutorConfig):
"""Test the _merge_config_with_transformations function with hardcoded json data"""
input_path = os.path.join(_ROOT, "resources/small_heterogeneous_graph")
executor_configuration = ExecutorConfig(
local_config_path=input_path,
local_metadata_output_path=tempdir,
input_prefix=input_path,
output_prefix=tempdir,
num_output_files=-1,
config_filename="gsprocessing-config.json",
execution_env=ExecutionEnv.LOCAL,
filesystem_type=FilesystemType.LOCAL,
add_reverse_edges=True,
graph_name="small_heterogeneous_graph",
do_repartition=True,
)

dist_executor = DistributedExecutor(executor_configuration)

pre_comp_transormations = {
Expand All @@ -148,3 +143,28 @@ def test_merge_input_and_transform_dicts(tempdir: str):
if "state" == feature["column"]:
transform_for_feature = feature["precomputed_transformation"]
assert transform_for_feature["transformation_name"] == "categorical"


def test_dist_executor_graph_name(executor_configuration: ExecutorConfig):
"""Test cases for graph name"""

# Ensure we can set a valid graph name
executor_configuration.graph_name = "2024-a_valid_name"
dist_executor = DistributedExecutor(executor_configuration)
assert dist_executor.graph_name == "2024-a_valid_name"

# Ensure default value is used when graph_name is not provided
executor_configuration.graph_name = None
dist_executor = DistributedExecutor(executor_configuration)
assert dist_executor.graph_name == "small_heterogeneous_graph"

# Ensure we raise when invalid graph name is provided
with pytest.raises(AssertionError):
executor_configuration.graph_name = "graph.name"
dist_executor = DistributedExecutor(executor_configuration)

# Ensure a valid default graph name is parsed when the input ends in '/'
executor_configuration.graph_name = None
executor_configuration.input_prefix = executor_configuration.input_prefix + "/"
dist_executor = DistributedExecutor(executor_configuration)
assert dist_executor.graph_name == "small_heterogeneous_graph"
14 changes: 12 additions & 2 deletions python/graphstorm/config/argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,12 @@ def wd_l2norm(self):
"""
# pylint: disable=no-member
if hasattr(self, "_wd_l2norm"):
return self._wd_l2norm
try:
wd_l2norm = float(self._wd_l2norm)
except:
raise ValueError("wd_l2norm must be a floating point " \
f"but get {self._wd_l2norm}")
return wd_l2norm
return 0

@property
Expand All @@ -1735,7 +1740,12 @@ def alpha_l2norm(self):
"""
# pylint: disable=no-member
if hasattr(self, "_alpha_l2norm"):
return self._alpha_l2norm
try:
alpha_l2norm = float(self._alpha_l2norm)
except:
raise ValueError("alpha_l2norm must be a floating point " \
f"but get {self._alpha_l2norm}")
return alpha_l2norm
return .0

@property
Expand Down
16 changes: 16 additions & 0 deletions tests/unit-tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ def create_train_config(tmp_path, file_name):
yaml_object["gsf"]["hyperparam"] = {
"topk_model_to_save": 4,
"save_model_path": os.path.join(tmp_path, "save"),
"wd_l2norm": 5e-5,
"alpha_l2norm": 5e-5,
}
with open(os.path.join(tmp_path, file_name+"1.yaml"), "w") as f:
yaml.dump(yaml_object, f)
Expand All @@ -261,6 +263,8 @@ def create_train_config(tmp_path, file_name):
'save_model_frequency': 2000,
"topk_model_to_save": 5,
"save_model_path": os.path.join(tmp_path, "save"),
"wd_l2norm": "1e-3",
"alpha_l2norm": "1e-3",
}
with open(os.path.join(tmp_path, file_name+"2.yaml"), "w") as f:
yaml.dump(yaml_object, f)
Expand Down Expand Up @@ -291,6 +295,8 @@ def create_train_config(tmp_path, file_name):
"use_early_stop": True,
"early_stop_burnin_rounds": -1,
"early_stop_rounds": 0,
"wd_l2norm": "NA",
"alpha_l2norm": "NA",
}

with open(os.path.join(tmp_path, file_name+"_fail.yaml"), "w") as f:
Expand All @@ -301,6 +307,8 @@ def create_train_config(tmp_path, file_name):
'save_model_frequency': 2000,
"topk_model_to_save": 3,
"save_model_path": os.path.join(tmp_path, "save"),
"wd_l2norm": "",
"alpha_l2norm": "",
}
with open(os.path.join(tmp_path, file_name+"_fail1.yaml"), "w") as f:
yaml.dump(yaml_object, f)
Expand Down Expand Up @@ -350,12 +358,16 @@ def test_train_info():
args = Namespace(yaml_config_file=os.path.join(Path(tmpdirname), 'train_test1.yaml'), local_rank=0)
config = GSConfig(args)
assert config.topk_model_to_save == 4
assert config.wd_l2norm == 5e-5
assert config.alpha_l2norm == 5e-5

args = Namespace(yaml_config_file=os.path.join(Path(tmpdirname), 'train_test2.yaml'), local_rank=0)
config = GSConfig(args)
assert config.eval_frequency == 1000
assert config.save_model_frequency == 2000
assert config.topk_model_to_save == 5
assert config.wd_l2norm == 1e-3
assert config.alpha_l2norm == 1e-3

args = Namespace(yaml_config_file=os.path.join(Path(tmpdirname), 'train_test3.yaml'), local_rank=0)
config = GSConfig(args)
Expand All @@ -380,12 +392,16 @@ def test_train_info():
check_failure(config, "topk_model_to_save")
check_failure(config, "early_stop_burnin_rounds")
check_failure(config, "early_stop_rounds")
check_failure(config, "wd_l2norm")
check_failure(config, "alpha_l2norm")

args = Namespace(yaml_config_file=os.path.join(Path(tmpdirname), 'train_test_fail1.yaml'), local_rank=0)
config = GSConfig(args)
# in PR # 893 we loose the constraints of model saving frequency and eval frequency
# so here we do not check failure, but check the topk model argument
assert config.topk_model_to_save == 3
check_failure(config, "wd_l2norm")
check_failure(config, "alpha_l2norm")

def create_rgcn_config(tmp_path, file_name):
yaml_object = create_dummpy_config_obj()
Expand Down

0 comments on commit 0d03e80

Please sign in to comment.