Skip to content

Commit

Permalink
[GSProcessing] Allow re-partitioning to run on the Spark leader
Browse files Browse the repository at this point in the history
  • Loading branch information
thvasilo committed Mar 13, 2024
1 parent 1057370 commit c72c2a9
Show file tree
Hide file tree
Showing 11 changed files with 689 additions and 320 deletions.
22 changes: 7 additions & 15 deletions docs/source/gs-processing/gs-processing-getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,14 @@ cluster.

To use the library to process your data, you will need to have your data
in a tabular format, and a corresponding JSON configuration file that describes the
data. The input data can be in CSV (with header(s)) or Parquet format.
data. **The input data need to be in CSV (with header(s)) or Parquet format.**

The configuration file can be in GraphStorm's GConstruct format,
**with the caveat that the file paths need to be relative to the
location of the config file.** See :ref:`gsp-relative-paths` for more details.
location of the config file.** Also note that you'll need to convert
all your input data to CSV or Parquet files.

See :ref:`gsp-relative-paths` for more details.

After installing the library, executing a processing job locally can be done using:

Expand All @@ -125,20 +128,9 @@ After installing the library, executing a processing job locally can be done usi
gs-processing \
--config-filename gconstruct-config.json \
--input-prefix /path/to/input/data \
--output-prefix /path/to/output/data
Once the processing engine has processed the data, we want to ensure
they match the requirements of the DGL distributed partitioning
pipeline, so we need to run an additional script that will
make sure the produced data matches the assumptions of DGL [#f1]_.

.. note::

Ensure you pass the output path of the previous step as the input path here.

.. code-block:: bash
--output-prefix /path/to/output/data \
--repartition-on-leader True
gs-repartition --input-prefix /path/to/output/data
Once this script completes, the data are ready to be fed into DGL's distributed
partitioning pipeline.
Expand Down
15 changes: 12 additions & 3 deletions docs/source/gs-processing/usage/example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Given the above we can run a job with local input data as:
.. code-block:: bash
> gs-processing --input-data /home/path/to/data \
--config-filename gconstruct-config.json
--config-filename gconstruct-config.json --repartition-on-leader True
The benefit with using relative paths is that we can move the same files
to any location, including S3, and run the same job without making changes to the config
Expand Down Expand Up @@ -175,12 +175,21 @@ we can use the following command to run the processing job locally:
gs-processing --config-filename gconstruct-config.json \
--input-prefix ./tests/resources/small_heterogeneous_graph \
--output-prefix /tmp/gsprocessing-example/
--output-prefix /tmp/gsprocessing-example/ \
--repartition-on-leader True
About re-partitioning
~~~~~~~~~~~~~~~~~~~~~

To finalize processing and to wrangle the data into the structure that
DGL distributed partitioning expects, we need an additional step that
guarantees the data conform to the expectations of DGL:
guarantees the data conform to the expectations of DGL, after the
Spark job is done.

We have the option to run this additional step on the Spark leader
as shown above by setting `--repartition-on-leader` to `"True"`
or if our data are too large for the memory of our Spark leader
we can run as a separate job:

.. code-block:: bash
Expand Down
7 changes: 5 additions & 2 deletions docs/source/gs-processing/usage/row-count-alignment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ after the processing step. This step performs two functions:
Local repartitioning
--------------------

The simplest way to apply the re-partitioning step is to do so using a local
The simplest way to apply the re-partitioning step is to do so during the `gs-processing` step,
by passing the additional `--repartition-on-leader True` argument to our launch script.

Alternatively, we can run a local re-partitioning job using a local
installation of GSProcessing:

.. code-block:: bash
Expand Down Expand Up @@ -101,7 +104,7 @@ on SageMaker:
INSTANCE_TYPE="ml.t3.xlarge"
python scripts/run_repartitioning.py --s3-input-prefix ${PROCESSED_OUTPUT} \
--role ${ROLE} --image ${IMAGE_URI} --config-filename "metadata.json" \
--role ${ROLE} --image ${IMAGE_URI} \
--instance-type ${INSTANCE_TYPE} --wait-for-job
File streaming repartitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
try:
from smspark.bootstrapper import Bootstrapper
except ImportError:
# smspark only exists on the Docker image
# smspark only exists on the SageMaker Docker image
class Bootstrapper: # type:ignore
# pylint: disable=all
def load_processing_job_config(self):
Expand Down Expand Up @@ -99,7 +99,7 @@ def create_spark_session(sm_execution: bool, filesystem_type: str) -> SparkSessi
# Avoid timeout errors due to connection pool starving
# Allow sending large results to driver
spark_builder = (
SparkSession.builder.appName("GraphlyticsGraphPreloading")
SparkSession.builder.appName("GSProcessing")
.config("spark.hadoop.validateOutputSpecs", "false")
.config("spark.driver.memory", f"{driver_mem_mb}m")
.config("spark.driver.memoryOverhead", f"{driver_mem_overhead_mb}m")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
from graphstorm_processing.config.config_parser import create_config_objects
from graphstorm_processing.config.config_conversion import GConstructConfigConverter
from graphstorm_processing.data_transformations import spark_utils, s3_utils
from graphstorm_processing.repartition_files import (
repartition_files,
modify_flat_array_metadata,
ParquetRepartitioner,
)
from graphstorm_processing.graph_loaders.row_count_utils import verify_metadata_match


@dataclasses.dataclass
Expand All @@ -91,6 +97,10 @@ class ExecutorConfig:
The filesystem type, can be 'local' or 's3'.
add_reverse_edges : bool
Whether to create reverse edges for each edge type.
graph_name: str
The name of the graph being processed
apply_repartition: bool
Whether to apply repartitioning to the graph on the Spark leader.
"""

local_config_path: str
Expand All @@ -103,6 +113,7 @@ class ExecutorConfig:
filesystem_type: str
add_reverse_edges: bool
graph_name: str
repartition_on_leader: bool


@dataclasses.dataclass
Expand All @@ -116,6 +127,7 @@ class GSProcessingArguments:
add_reverse_edges: bool
log_level: str
graph_name: str
repartition_on_leader: bool


class DistributedExecutor:
Expand All @@ -142,6 +154,7 @@ def __init__(
self.sm_execution = executor_config.sm_execution
self.add_reverse_edges = executor_config.add_reverse_edges
self.graph_name = executor_config.graph_name
self.repartition_on_leader = executor_config.repartition_on_leader

# Ensure we have write access to the output path
if self.filesystem_type == "local":
Expand Down Expand Up @@ -224,9 +237,52 @@ def run(self) -> None:
enable_assertions=False,
graph_name=self.graph_name,
)
loader.load()
graph_meta_dict = loader.load()

t1 = time.time()
logging.info("Time to transform data for distributed partitioning: %s sec", t1 - t0)
# Stop the Spark context
self.spark.stop()

all_match = verify_metadata_match(graph_meta_dict)
repartitioner = ParquetRepartitioner(
self.output_prefix,
self.filesystem_type,
region=None,
verify_outputs=True,
streaming_repartitioning=False,
)

if all_match:
logging.info(
"All file row counts match, applying Parquet metadata modification on leader..."
)
modify_flat_array_metadata(graph_meta_dict, repartitioner)
logging.info("Data are now ready to be fed to DistPart pipeline")
else:
if self.repartition_on_leader:
logging.info("Attempting to repartition graph data on Spark leader...")
try:
updated_metadata = repartition_files(graph_meta_dict, repartitioner)
with open(
os.path.join(loader.output_path, "updated_row_counts_metadata.json"),
"w",
encoding="utf-8",
) as f:
json.dump(updated_metadata, f, indent=4)
f.flush()
logging.info("Data are now ready to be fed to DistPart pipeline")
except Exception as e: # pylint: disable=broad-exception-caught
logging.error(
"Failed to repartition data on Spark leader, "
"will need to run follow-up re-partition job. "
"Original error: %s",
str(e),
)
else:
logging.warning("gs-repartition will need to run as a follow-up job on the data!")

# This is used to upload the output JSON files to S3 on local runs,
# This is used to upload the output JSON files to S3 on non-SageMaker runs,
# since we can't rely on SageMaker to do it
if not self.sm_execution and self.filesystem_type == "s3":
bucket, s3_prefix = s3_utils.extract_bucket_and_key(self.output_prefix)
Expand All @@ -240,10 +296,6 @@ def run(self) -> None:
f"{s3_prefix}/{output_file}",
)

t1 = time.time()
logging.info("[Prof-info] graph loading time %s", t1 - t0)
self.spark.stop()


def parse_args() -> argparse.Namespace:
"""Parse the arguments for the execution."""
Expand Down Expand Up @@ -297,6 +349,15 @@ def parse_args() -> argparse.Namespace:
help="Logging level, default is INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
parser.add_argument(
"--repartition-on-leader",
type=lambda x: (str(x).lower() in ["true", "1"]),
default=False,
help=(
"When set to 'True', will try to re-partition the output "
"data on the Spark leader if necessary"
),
)

return parser.parse_args()

Expand Down Expand Up @@ -382,6 +443,7 @@ def main():
filesystem_type=filesystem_type,
add_reverse_edges=gsprocessing_args.add_reverse_edges,
graph_name=gsprocessing_args.graph_name,
repartition_on_leader=gsprocessing_args.repartition_on_leader,
)

dist_executor = DistributedExecutor(executor_configuration)
Expand All @@ -391,6 +453,7 @@ def main():
# Save arguments to file for posterity
with open(os.path.join(local_output_path, "launch_arguments.json"), "w", encoding="utf-8") as f:
json.dump(dataclasses.asdict(gsprocessing_args), f, indent=4)
f.flush()

# In SageMaker execution, all files under `local_output_path` get automatically
# uploaded to S3 at the end of the job. For local execution with S3 data, we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def __init__(

def process_and_write_graph_data(
self, data_configs: Mapping[str, Sequence[StructureConfig]]
) -> None:
) -> Dict:
"""Process and encode all graph data.
Extracts and encodes graph structure before writing to storage, then applies pre-processing
Expand All @@ -157,6 +157,22 @@ def process_and_write_graph_data(
----------
data_configs : Mapping[str, Sequence[StructureConfig]]
Dictionary of configuration for nodes and edges
Returns
-------
metadata_dict : Dict
Dictionary of metadata for the graph, in "chunked-graph"
format, with additional keys.
For chunked graph format see
https://docs.dgl.ai/guide/distributed-preprocessing.html#specification
The dict also contains a "raw_id_mappings" key, which is a dict
of dicts, one for each node type. Each entry contains files information
about the raw-to-integet ID mapping for each node.
The returned value also contains an additional dict of dicts,
"graph_info" which contains additional information about the
graph in a more readable format.
"""
# TODO: See if it's better to return some data structure
# for the followup steps instead of just have side-effects
Expand Down Expand Up @@ -244,6 +260,8 @@ def process_and_write_graph_data(

logging.info("Finished Distributed Graph Processing ...")

return metadata_dict

@staticmethod
def _at_least_one_label_exists(data_configs: Mapping[str, Sequence[StructureConfig]]) -> bool:
"""
Expand Down Expand Up @@ -939,7 +957,7 @@ def _process_node_features(

transformed_feature_df = transformer.apply_transformation(nodes_df)

def process_feature(self, feat_name, single_feature_df, node_type, transformer_name):
def write_processed_feature(feat_name, single_feature_df, node_type, transformer_name):
feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)
Expand Down Expand Up @@ -974,8 +992,7 @@ def process_feature(self, feat_name, single_feature_df, node_type, transformer_n
):
for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]:
single_feature_df = transformed_feature_df.select(bert_feat_name)
process_feature(
self,
write_processed_feature(
bert_feat_name,
single_feature_df,
node_type,
Expand All @@ -985,8 +1002,7 @@ def process_feature(self, feat_name, single_feature_df, node_type, transformer_n
single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(
feat_col, feat_name
)
process_feature(
self,
write_processed_feature(
feat_name,
single_feature_df,
node_type,
Expand Down Expand Up @@ -1665,5 +1681,5 @@ def write_mask(kind: str, mask_df: DataFrame) -> Sequence[str]:

return split_metadata

def load(self):
self.process_and_write_graph_data(self._data_configs)
def load(self) -> Dict:
return self.process_and_write_graph_data(self._data_configs)
Loading

0 comments on commit c72c2a9

Please sign in to comment.