diff --git a/python/graphstorm/gconstruct/file_io.py b/python/graphstorm/gconstruct/file_io.py index 5eed485372..b85aa41d1e 100644 --- a/python/graphstorm/gconstruct/file_io.py +++ b/python/graphstorm/gconstruct/file_io.py @@ -88,6 +88,8 @@ def expand_wildcard(data_files: List[str]) -> List[str]: """ expanded_files = [] + if len(data_files) == 1 and os.path.isdir(data_files[0]): + data_files = [os.path.join(data_files[0], "*")] for item in data_files: if '*' in item: matched_files = sorted(glob.glob(item)) diff --git a/sagemaker/pipeline/create_sm_pipeline.py b/sagemaker/pipeline/create_sm_pipeline.py index b7110907e9..ebbc00ada5 100644 --- a/sagemaker/pipeline/create_sm_pipeline.py +++ b/sagemaker/pipeline/create_sm_pipeline.py @@ -89,6 +89,13 @@ def __init__( if self.args.instance_config.train_on_cpu else self.gpu_instance_type_param ) + self.train_infer_image = ( + args.aws_config.graphstorm_pytorch_cpu_image_url + if self.args.instance_config.train_on_cpu + else + args.aws_config.graphstorm_pytorch_gpu_image_url + + ) def _get_or_create_pipeline_session( self, input_session: Optional[PipelineSession] = None @@ -184,6 +191,10 @@ def _create_pipeline_parameters(self, args: PipelineArgs): "GraphConstructInstanceType", args.instance_config.graph_construction_instance_type, ) + self.volume_size_gb_param = self._create_int_parameter( + "InstanceVolumeSizeGB", + args.instance_config.volume_size_gb, + ) self.graphconstruct_config_param = self._create_string_parameter( "GraphConstructConfigFile", args.graph_construction_config.config_filename ) @@ -279,12 +290,13 @@ def _create_gconstruct_step(self, args: PipelineArgs) -> ProcessingStep: ) gconstruct_processor = ScriptProcessor( - image_uri=args.aws_config.graphstorm_pytorch_image_url, + image_uri=args.aws_config.graphstorm_pytorch_cpu_image_url, role=args.aws_config.role, instance_count=1, instance_type=self.graphconstruct_instance_type_param, command=["python3"], sagemaker_session=self.pipeline_session, + volume_size_in_gb=self.volume_size_gb_param, ) gconstruct_s3_output = Join( @@ -298,7 +310,9 @@ def _create_gconstruct_step(self, args: PipelineArgs) -> ProcessingStep: gc_local_input_path = "/opt/ml/processing/input" # GConstruct should always be the first step and start with the source data gc_proc_input = ProcessingInput( - source=self.input_data_param, destination=gc_local_input_path + source=self.input_data_param, + destination=gc_local_input_path, + s3_input_mode='File', ) gc_local_output_path = "/opt/ml/processing/output" gc_proc_output = ProcessingOutput( @@ -318,7 +332,6 @@ def _create_gconstruct_step(self, args: PipelineArgs) -> ProcessingStep: self.graph_name_param, "--num-parts", self.instance_count_param.to_string(), - "--add-reverse-edges", ] # TODO: Make this a pipeline parameter? @@ -354,6 +367,7 @@ def _create_gsprocessing_step(self, args: PipelineArgs) -> ProcessingStep: instance_count=args.instance_config.gsprocessing_instance_count, image_uri=args.aws_config.gsprocessing_pyspark_image_url, sagemaker_session=self.pipeline_session, + volume_size_in_gb=self.volume_size_gb_param, ) gsprocessing_output = Join( @@ -427,12 +441,13 @@ def _create_gsprocessing_step(self, args: PipelineArgs) -> ProcessingStep: def _create_dist_part_step(self, args: PipelineArgs) -> ProcessingStep: # Implementation for DistPartition step dist_part_processor = ScriptProcessor( - image_uri=args.aws_config.graphstorm_pytorch_image_url, + image_uri=args.aws_config.graphstorm_pytorch_cpu_image_url, role=args.aws_config.role, instance_count=self.instance_count_param, instance_type=self.cpu_instance_type_param, command=["python3"], sagemaker_session=self.pipeline_session, + volume_size_in_gb=self.volume_size_gb_param, ) partition_output = Join( @@ -484,12 +499,13 @@ def _create_dist_part_step(self, args: PipelineArgs) -> ProcessingStep: def _create_gb_convert_step(self, args: PipelineArgs) -> ProcessingStep: # Implementation for GraphBolt partition step gb_part_processor = ScriptProcessor( - image_uri=args.aws_config.graphstorm_pytorch_image_url, + image_uri=args.aws_config.graphstorm_pytorch_cpu_image_url, role=args.aws_config.role, instance_count=1, instance_type=self.graphconstruct_instance_type_param, command=["python3"], sagemaker_session=self.pipeline_session, + volume_size_in_gb=self.volume_size_gb_param, ) gb_convert_arguments = [ @@ -506,6 +522,7 @@ def _create_gb_convert_step(self, args: PipelineArgs) -> ProcessingStep: input_name="dist_graph_s3_input", destination="/opt/ml/processing/dist_graph/", source=self.next_step_data_input, + # GraphBolt conversion requires File mode s3_input_mode="File", ) ], @@ -530,7 +547,7 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep: train_estimator = PyTorch( entry_point=os.path.basename(args.script_paths.train_script), source_dir=os.path.dirname(args.script_paths.train_script), - image_uri=args.aws_config.graphstorm_pytorch_image_url, + image_uri=self.train_infer_image, role=args.aws_config.role, instance_count=self.instance_count_param, instance_type=self.train_infer_instance, @@ -548,6 +565,7 @@ def _create_train_step(self, args: PipelineArgs) -> TrainingStep: sagemaker_session=self.pipeline_session, disable_profiler=True, debugger_hook_config=False, + volume_size=self.volume_size_gb_param, ) train_step = TrainingStep( @@ -613,7 +631,7 @@ def _create_inference_step(self, args: PipelineArgs) -> TrainingStep: inference_estimator = PyTorch( entry_point=os.path.basename(args.script_paths.inference_script), source_dir=os.path.dirname(args.script_paths.inference_script), - image_uri=args.aws_config.graphstorm_pytorch_image_url, + image_uri=self.train_infer_image, role=args.aws_config.role, instance_count=self.instance_count_param, instance_type=self.train_infer_instance, @@ -622,6 +640,7 @@ def _create_inference_step(self, args: PipelineArgs) -> TrainingStep: sagemaker_session=self.pipeline_session, disable_profiler=True, debugger_hook_config=False, + volume_size=self.volume_size_gb_param, ) inference_step = TrainingStep( diff --git a/sagemaker/pipeline/execute_sm_pipeline.py b/sagemaker/pipeline/execute_sm_pipeline.py index 5e5fbebed6..a63ff2ef75 100644 --- a/sagemaker/pipeline/execute_sm_pipeline.py +++ b/sagemaker/pipeline/execute_sm_pipeline.py @@ -38,73 +38,79 @@ def parse_args(): "--pipeline-name", type=str, required=True, - help="Name of the pipeline to execute", + help="Name of the pipeline to execute. Required.", + ) + parser.add_argument("--region", type=str, required=False, + help="AWS region. Required for SageMaker execution.") + parser.add_argument( + "--async-execution", action="store_true", + help="Run pipeline asynchronously on SageMaker, return after printing execution ARN." + ) + parser.add_argument( + "--local-execution", + action="store_true", + help="Use a local pipeline session to execute the pipeline.", ) parser.add_argument( "--pipeline-args-json-file", type=str, help=( - "If executing locally, provide a JSON representation of the pipeline arguments. " + "When executing locally, optionally provide a JSON representation of the pipeline arguments. " "By default we look for '-pipeline-args.json' in the working dir." ), ) - parser.add_argument("--region", type=str, required=False, help="AWS region") + + + overrides = parser.add_argument_group( + "Pipeline overrides", + "Override default pipeline parameters at execution time.") # Optional override parameters - parser.add_argument("--instance-count", type=int, help="Override instance count") - parser.add_argument( + overrides.add_argument("--instance-count", type=int, help="Override instance count") + overrides.add_argument( "--cpu-instance-type", type=str, help="Override CPU instance type" ) - parser.add_argument( + overrides.add_argument( "--gpu-instance-type", type=str, help="Override GPU instance type" ) - parser.add_argument( + overrides.add_argument( "--graphconstruct-instance-type", type=str, help="Override graph construction instance type", ) - parser.add_argument( + overrides.add_argument( "--graphconstruct-config-file", type=str, help="Override graph construction config file", ) - parser.add_argument( + overrides.add_argument( "--partition-algorithm", type=str, choices=["random", "parmetis"], help="Override partition algorithm", ) - parser.add_argument("--graph-name", type=str, help="Override graph name") - parser.add_argument("--num-trainers", type=int, help="Override number of trainers") - parser.add_argument( + overrides.add_argument("--graph-name", type=str, help="Override graph name") + overrides.add_argument("--num-trainers", type=int, help="Override number of trainers") + overrides.add_argument( "--use-graphbolt", type=str, choices=["true", "false"], help="Override whether to use GraphBolt", ) - parser.add_argument("--input-data", type=str, help="Override input data S3 path") - parser.add_argument("--output-prefix", type=str, help="Override output prefix") - parser.add_argument( - "--train-config-file", type=str, help="Override train config file S3 path" + overrides.add_argument("--input-data", type=str, help="Override input data S3 path") + overrides.add_argument("--output-prefix", type=str, help="Override output prefix") + overrides.add_argument( + "--train-yaml-file", type=str, help="Override train yaml file S3 path" ) - parser.add_argument( - "--inference-config-file", + overrides.add_argument( + "--inference-yaml-file", type=str, - help="Override inference config file S3 path", + help="Override inference yaml file S3 path", ) - parser.add_argument( + overrides.add_argument( "--inference-model-snapshot", type=str, help="Override inference model snapshot" ) - parser.add_argument( - "--async-execution", action="store_true", help="Run pipeline asynchronously" - ) - parser.add_argument( - "--local-execution", - action="store_true", - help="Use a local pipeline session to run the pipeline.", - ) - return parser.parse_args() @@ -163,10 +169,10 @@ def main(): execution_params["InputData"] = args.input_data if args.output_prefix: execution_params["OutputPrefix"] = args.output_prefix - if args.train_config_file: - execution_params["TrainConfigFile"] = args.train_config_file - if args.inference_config_file: - execution_params["InferenceConfigFile"] = args.inference_config_file + if args.train_yaml_file: + execution_params["TrainConfigFile"] = args.train_yaml_file + if args.inference_yaml_file: + execution_params["InferenceConfigFile"] = args.inference_yaml_file if args.inference_model_snapshot: execution_params["InferenceModelSnapshot"] = args.inference_model_snapshot diff --git a/sagemaker/pipeline/pipeline_parameters.py b/sagemaker/pipeline/pipeline_parameters.py index 8dc8f0f4d7..89580b394c 100644 --- a/sagemaker/pipeline/pipeline_parameters.py +++ b/sagemaker/pipeline/pipeline_parameters.py @@ -42,7 +42,8 @@ class AWSConfig: role: str region: str - graphstorm_pytorch_image_url: str + graphstorm_pytorch_cpu_image_url: str + graphstorm_pytorch_gpu_image_url: str gsprocessing_pyspark_image_url: str @@ -56,15 +57,16 @@ class InstanceConfig: graph_construction_instance_type: str gsprocessing_instance_count: int train_on_cpu: bool + volume_size_gb: int def __post_init__(self): assert ( self.cpu_instance_type or self.gpu_instance_type ), "At least one instance type (CPU or GPU) should be specified." - assert ( - self.train_on_cpu and self.cpu_instance_type - ), "Need to provide a CPU instance type when training on CPU" + if self.train_on_cpu: + assert self.cpu_instance_type, \ + "Need to provide a CPU instance type when training on CPU" if not self.graph_construction_instance_type: self.graph_construction_instance_type = self.cpu_instance_type @@ -215,6 +217,10 @@ def __post_init__(self): f"got {self.instance_config.train_on_cpu=} " f"{self.instance_config.gpu_instance_type=}" ) + assert self.aws_config.graphstorm_pytorch_cpu_image_url, ( + "Must use provide GPU image when training on GPU. " + "use --graphstorm-pytorch-gpu-image-url" + ) # Ensure we provide a GConstruct/GSProcessing config file when running construction if ( @@ -366,10 +372,15 @@ def parse_pipeline_args() -> PipelineArgs: "--region", type=str, required=True, help="AWS region. Required" ) required_args.add_argument( - "--graphstorm-pytorch-image-url", + "--graphstorm-pytorch-cpu-image-url", type=str, required=True, - help="GraphStorm GConstruct/dist_part/train/inference ECR image URL. Required", + help="GraphStorm GConstruct/dist_part/train/inference CPU ECR image URL. Required", + ) + optional_args.add_argument( + "--graphstorm-pytorch-gpu-image-url", + type=str, + help="GraphStorm GConstruct/dist_part/train/inference GPU ECR image URL.", ) optional_args.add_argument( "--gsprocessing-pyspark-image-url", @@ -402,6 +413,12 @@ def parse_pipeline_args() -> PipelineArgs: action="store_true", help="Run training and inference on CPU instances instead of GPU", ) + optional_args.add_argument( + "--volume-size-gb", + type=int, + help="Additional volume size for instances in GB.", + default=100, + ) # Pipeline/Task Configuration required_args.add_argument( @@ -617,7 +634,8 @@ def parse_pipeline_args() -> PipelineArgs: aws_config=AWSConfig( role=args.role, region=args.region, - graphstorm_pytorch_image_url=args.graphstorm_pytorch_image_url, + graphstorm_pytorch_cpu_image_url=args.graphstorm_pytorch_cpu_image_url, + graphstorm_pytorch_gpu_image_url=args.graphstorm_pytorch_gpu_image_url, gsprocessing_pyspark_image_url=args.gsprocessing_pyspark_image_url, ), instance_config=InstanceConfig( @@ -627,6 +645,7 @@ def parse_pipeline_args() -> PipelineArgs: cpu_instance_type=args.cpu_instance_type, gpu_instance_type=args.gpu_instance_type, train_on_cpu=args.train_on_cpu, + volume_size_gb=args.volume_size_gb, ), task_config=TaskConfig( graph_name=args.graph_name,