Skip to content

Commit

Permalink
Add parameters for separate GPU and CPU images, instance volume size.
Browse files Browse the repository at this point in the history
  • Loading branch information
thvasilo committed Dec 17, 2024
1 parent a0be6a2 commit d078d0b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 47 deletions.
2 changes: 2 additions & 0 deletions python/graphstorm/gconstruct/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def expand_wildcard(data_files: List[str]) -> List[str]:
"""
expanded_files = []
if len(data_files) == 1 and os.path.isdir(data_files[0]):
data_files = [os.path.join(data_files[0], "*")]
for item in data_files:
if '*' in item:
matched_files = sorted(glob.glob(item))
Expand Down
33 changes: 26 additions & 7 deletions sagemaker/pipeline/create_sm_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ def __init__(
if self.args.instance_config.train_on_cpu
else self.gpu_instance_type_param
)
self.train_infer_image = (
args.aws_config.graphstorm_pytorch_cpu_image_url
if self.args.instance_config.train_on_cpu
else
args.aws_config.graphstorm_pytorch_gpu_image_url

)

def _get_or_create_pipeline_session(
self, input_session: Optional[PipelineSession] = None
Expand Down Expand Up @@ -184,6 +191,10 @@ def _create_pipeline_parameters(self, args: PipelineArgs):
"GraphConstructInstanceType",
args.instance_config.graph_construction_instance_type,
)
self.volume_size_gb_param = self._create_int_parameter(
"InstanceVolumeSizeGB",
args.instance_config.volume_size_gb,
)
self.graphconstruct_config_param = self._create_string_parameter(
"GraphConstructConfigFile", args.graph_construction_config.config_filename
)
Expand Down Expand Up @@ -279,12 +290,13 @@ def _create_gconstruct_step(self, args: PipelineArgs) -> ProcessingStep:
)

gconstruct_processor = ScriptProcessor(
image_uri=args.aws_config.graphstorm_pytorch_image_url,
image_uri=args.aws_config.graphstorm_pytorch_cpu_image_url,
role=args.aws_config.role,
instance_count=1,
instance_type=self.graphconstruct_instance_type_param,
command=["python3"],
sagemaker_session=self.pipeline_session,
volume_size_in_gb=self.volume_size_gb_param,
)

gconstruct_s3_output = Join(
Expand All @@ -298,7 +310,9 @@ def _create_gconstruct_step(self, args: PipelineArgs) -> ProcessingStep:
gc_local_input_path = "/opt/ml/processing/input"
# GConstruct should always be the first step and start with the source data
gc_proc_input = ProcessingInput(
source=self.input_data_param, destination=gc_local_input_path
source=self.input_data_param,
destination=gc_local_input_path,
s3_input_mode='File',
)
gc_local_output_path = "/opt/ml/processing/output"
gc_proc_output = ProcessingOutput(
Expand All @@ -318,7 +332,6 @@ def _create_gconstruct_step(self, args: PipelineArgs) -> ProcessingStep:
self.graph_name_param,
"--num-parts",
self.instance_count_param.to_string(),
"--add-reverse-edges",
]

# TODO: Make this a pipeline parameter?
Expand Down Expand Up @@ -354,6 +367,7 @@ def _create_gsprocessing_step(self, args: PipelineArgs) -> ProcessingStep:
instance_count=args.instance_config.gsprocessing_instance_count,
image_uri=args.aws_config.gsprocessing_pyspark_image_url,
sagemaker_session=self.pipeline_session,
volume_size_in_gb=self.volume_size_gb_param,
)

gsprocessing_output = Join(
Expand Down Expand Up @@ -427,12 +441,13 @@ def _create_gsprocessing_step(self, args: PipelineArgs) -> ProcessingStep:
def _create_dist_part_step(self, args: PipelineArgs) -> ProcessingStep:
# Implementation for DistPartition step
dist_part_processor = ScriptProcessor(
image_uri=args.aws_config.graphstorm_pytorch_image_url,
image_uri=args.aws_config.graphstorm_pytorch_cpu_image_url,
role=args.aws_config.role,
instance_count=self.instance_count_param,
instance_type=self.cpu_instance_type_param,
command=["python3"],
sagemaker_session=self.pipeline_session,
volume_size_in_gb=self.volume_size_gb_param,
)

partition_output = Join(
Expand Down Expand Up @@ -484,12 +499,13 @@ def _create_dist_part_step(self, args: PipelineArgs) -> ProcessingStep:
def _create_gb_convert_step(self, args: PipelineArgs) -> ProcessingStep:
# Implementation for GraphBolt partition step
gb_part_processor = ScriptProcessor(
image_uri=args.aws_config.graphstorm_pytorch_image_url,
image_uri=args.aws_config.graphstorm_pytorch_cpu_image_url,
role=args.aws_config.role,
instance_count=1,
instance_type=self.graphconstruct_instance_type_param,
command=["python3"],
sagemaker_session=self.pipeline_session,
volume_size_in_gb=self.volume_size_gb_param,
)

gb_convert_arguments = [
Expand All @@ -506,6 +522,7 @@ def _create_gb_convert_step(self, args: PipelineArgs) -> ProcessingStep:
input_name="dist_graph_s3_input",
destination="/opt/ml/processing/dist_graph/",
source=self.next_step_data_input,
# GraphBolt conversion requires File mode
s3_input_mode="File",
)
],
Expand All @@ -530,7 +547,7 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep:
train_estimator = PyTorch(
entry_point=os.path.basename(args.script_paths.train_script),
source_dir=os.path.dirname(args.script_paths.train_script),
image_uri=args.aws_config.graphstorm_pytorch_image_url,
image_uri=self.train_infer_image,
role=args.aws_config.role,
instance_count=self.instance_count_param,
instance_type=self.train_infer_instance,
Expand All @@ -548,6 +565,7 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep:
sagemaker_session=self.pipeline_session,
disable_profiler=True,
debugger_hook_config=False,
volume_size=self.volume_size_gb_param,
)

train_step = TrainingStep(
Expand Down Expand Up @@ -613,7 +631,7 @@ def _create_inference_step(self, args: PipelineArgs) -> TrainingStep:
inference_estimator = PyTorch(
entry_point=os.path.basename(args.script_paths.inference_script),
source_dir=os.path.dirname(args.script_paths.inference_script),
image_uri=args.aws_config.graphstorm_pytorch_image_url,
image_uri=self.train_infer_image,
role=args.aws_config.role,
instance_count=self.instance_count_param,
instance_type=self.train_infer_instance,
Expand All @@ -622,6 +640,7 @@ def _create_inference_step(self, args: PipelineArgs) -> TrainingStep:
sagemaker_session=self.pipeline_session,
disable_profiler=True,
debugger_hook_config=False,
volume_size=self.volume_size_gb_param,
)

inference_step = TrainingStep(
Expand Down
72 changes: 39 additions & 33 deletions sagemaker/pipeline/execute_sm_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,73 +38,79 @@ def parse_args():
"--pipeline-name",
type=str,
required=True,
help="Name of the pipeline to execute",
help="Name of the pipeline to execute. Required.",
)
parser.add_argument("--region", type=str, required=False,
help="AWS region. Required for SageMaker execution.")
parser.add_argument(
"--async-execution", action="store_true",
help="Run pipeline asynchronously on SageMaker, return after printing execution ARN."
)
parser.add_argument(
"--local-execution",
action="store_true",
help="Use a local pipeline session to execute the pipeline.",
)
parser.add_argument(
"--pipeline-args-json-file",
type=str,
help=(
"If executing locally, provide a JSON representation of the pipeline arguments. "
"When executing locally, optionally provide a JSON representation of the pipeline arguments. "
"By default we look for '<pipeline-name>-pipeline-args.json' in the working dir."
),
)
parser.add_argument("--region", type=str, required=False, help="AWS region")


overrides = parser.add_argument_group(
"Pipeline overrides",
"Override default pipeline parameters at execution time.")

# Optional override parameters
parser.add_argument("--instance-count", type=int, help="Override instance count")
parser.add_argument(
overrides.add_argument("--instance-count", type=int, help="Override instance count")
overrides.add_argument(
"--cpu-instance-type", type=str, help="Override CPU instance type"
)
parser.add_argument(
overrides.add_argument(
"--gpu-instance-type", type=str, help="Override GPU instance type"
)
parser.add_argument(
overrides.add_argument(
"--graphconstruct-instance-type",
type=str,
help="Override graph construction instance type",
)
parser.add_argument(
overrides.add_argument(
"--graphconstruct-config-file",
type=str,
help="Override graph construction config file",
)
parser.add_argument(
overrides.add_argument(
"--partition-algorithm",
type=str,
choices=["random", "parmetis"],
help="Override partition algorithm",
)
parser.add_argument("--graph-name", type=str, help="Override graph name")
parser.add_argument("--num-trainers", type=int, help="Override number of trainers")
parser.add_argument(
overrides.add_argument("--graph-name", type=str, help="Override graph name")
overrides.add_argument("--num-trainers", type=int, help="Override number of trainers")
overrides.add_argument(
"--use-graphbolt",
type=str,
choices=["true", "false"],
help="Override whether to use GraphBolt",
)
parser.add_argument("--input-data", type=str, help="Override input data S3 path")
parser.add_argument("--output-prefix", type=str, help="Override output prefix")
parser.add_argument(
"--train-config-file", type=str, help="Override train config file S3 path"
overrides.add_argument("--input-data", type=str, help="Override input data S3 path")
overrides.add_argument("--output-prefix", type=str, help="Override output prefix")
overrides.add_argument(
"--train-yaml-file", type=str, help="Override train yaml file S3 path"
)
parser.add_argument(
"--inference-config-file",
overrides.add_argument(
"--inference-yaml-file",
type=str,
help="Override inference config file S3 path",
help="Override inference yaml file S3 path",
)
parser.add_argument(
overrides.add_argument(
"--inference-model-snapshot", type=str, help="Override inference model snapshot"
)

parser.add_argument(
"--async-execution", action="store_true", help="Run pipeline asynchronously"
)
parser.add_argument(
"--local-execution",
action="store_true",
help="Use a local pipeline session to run the pipeline.",
)

return parser.parse_args()


Expand Down Expand Up @@ -163,10 +169,10 @@ def main():
execution_params["InputData"] = args.input_data
if args.output_prefix:
execution_params["OutputPrefix"] = args.output_prefix
if args.train_config_file:
execution_params["TrainConfigFile"] = args.train_config_file
if args.inference_config_file:
execution_params["InferenceConfigFile"] = args.inference_config_file
if args.train_yaml_file:
execution_params["TrainConfigFile"] = args.train_yaml_file
if args.inference_yaml_file:
execution_params["InferenceConfigFile"] = args.inference_yaml_file
if args.inference_model_snapshot:
execution_params["InferenceModelSnapshot"] = args.inference_model_snapshot

Expand Down
33 changes: 26 additions & 7 deletions sagemaker/pipeline/pipeline_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class AWSConfig:

role: str
region: str
graphstorm_pytorch_image_url: str
graphstorm_pytorch_cpu_image_url: str
graphstorm_pytorch_gpu_image_url: str
gsprocessing_pyspark_image_url: str


Expand All @@ -56,15 +57,16 @@ class InstanceConfig:
graph_construction_instance_type: str
gsprocessing_instance_count: int
train_on_cpu: bool
volume_size_gb: int

def __post_init__(self):
assert (
self.cpu_instance_type or self.gpu_instance_type
), "At least one instance type (CPU or GPU) should be specified."

assert (
self.train_on_cpu and self.cpu_instance_type
), "Need to provide a CPU instance type when training on CPU"
if self.train_on_cpu:
assert self.cpu_instance_type, \
"Need to provide a CPU instance type when training on CPU"

if not self.graph_construction_instance_type:
self.graph_construction_instance_type = self.cpu_instance_type
Expand Down Expand Up @@ -215,6 +217,10 @@ def __post_init__(self):
f"got {self.instance_config.train_on_cpu=} "
f"{self.instance_config.gpu_instance_type=}"
)
assert self.aws_config.graphstorm_pytorch_cpu_image_url, (
"Must use provide GPU image when training on GPU. "
"use --graphstorm-pytorch-gpu-image-url"
)

# Ensure we provide a GConstruct/GSProcessing config file when running construction
if (
Expand Down Expand Up @@ -366,10 +372,15 @@ def parse_pipeline_args() -> PipelineArgs:
"--region", type=str, required=True, help="AWS region. Required"
)
required_args.add_argument(
"--graphstorm-pytorch-image-url",
"--graphstorm-pytorch-cpu-image-url",
type=str,
required=True,
help="GraphStorm GConstruct/dist_part/train/inference ECR image URL. Required",
help="GraphStorm GConstruct/dist_part/train/inference CPU ECR image URL. Required",
)
optional_args.add_argument(
"--graphstorm-pytorch-gpu-image-url",
type=str,
help="GraphStorm GConstruct/dist_part/train/inference GPU ECR image URL.",
)
optional_args.add_argument(
"--gsprocessing-pyspark-image-url",
Expand Down Expand Up @@ -402,6 +413,12 @@ def parse_pipeline_args() -> PipelineArgs:
action="store_true",
help="Run training and inference on CPU instances instead of GPU",
)
optional_args.add_argument(
"--volume-size-gb",
type=int,
help="Additional volume size for instances in GB.",
default=100,
)

# Pipeline/Task Configuration
required_args.add_argument(
Expand Down Expand Up @@ -617,7 +634,8 @@ def parse_pipeline_args() -> PipelineArgs:
aws_config=AWSConfig(
role=args.role,
region=args.region,
graphstorm_pytorch_image_url=args.graphstorm_pytorch_image_url,
graphstorm_pytorch_cpu_image_url=args.graphstorm_pytorch_cpu_image_url,
graphstorm_pytorch_gpu_image_url=args.graphstorm_pytorch_gpu_image_url,
gsprocessing_pyspark_image_url=args.gsprocessing_pyspark_image_url,
),
instance_config=InstanceConfig(
Expand All @@ -627,6 +645,7 @@ def parse_pipeline_args() -> PipelineArgs:
cpu_instance_type=args.cpu_instance_type,
gpu_instance_type=args.gpu_instance_type,
train_on_cpu=args.train_on_cpu,
volume_size_gb=args.volume_size_gb,
),
task_config=TaskConfig(
graph_name=args.graph_name,
Expand Down

0 comments on commit d078d0b

Please sign in to comment.