Skip to content

Commit

Permalink
[GSProcessing] Move to Spark 3.5, bump version to 0.3.0, add homogene…
Browse files Browse the repository at this point in the history
…ous edge mapping optimization (#791)

*Issue #, if available:*

*Description of changes:*

* Optimizations for edge re-mapping:
* When an edge is homogeneous (same src and dst node type) we cache and
re-use the node id mapping DF instead of loading from storage twice.
* Remove enforced re-partitions where possible, because they'd trigger
entire DF shuffles.
* These changes combined have improved total job runtime by up to 33% in
a random graph with 10B edges and 1B nodes.
* Bumps GSProcessing version to 0.3.0.
* Allows Spark up to 3.5 in project dependencies.
* Adds 0.3.0 Dockerfiles with Spark 3.5 for emr-s and sagemaker.

*Testing*

pytest, test jobs on SageMaker and EMR-S using the latest image versions
with ml-100k, and large-scale test on SM with
1B-edges-100M-nodes-1024feat graph.

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
thvasilo authored Apr 3, 2024
1 parent d9a76d8 commit 842b6f5
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 32 deletions.
55 changes: 55 additions & 0 deletions graphstorm-processing/docker/0.3.0/emr-serverless/Dockerfile.cpu
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
ARG ARCH=x86_64
FROM public.ecr.aws/emr-serverless/spark/emr-7.0.0:20240206-${ARCH} as base

USER root
ENV PYTHON_VERSION=3.9.18

# Python won’t try to write .pyc or .pyo files on the import of source modules
# Force stdin, stdout and stderr to be totally unbuffered. Good for logging
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8


FROM base AS arch-x86_64

FROM base AS arch-arm64
RUN yum install -y python3-devel && \
rm -rf /var/cache/yum

FROM arch-${ARCH} AS runtime


WORKDIR /usr/lib/spark/code/

# Install GSProcessing requirements to pyenv Python
COPY requirements.txt requirements.txt
# Use --mount=type=cache,target=/root/.cache when Buildkit CI issue is fixed:
# https://github.com/moby/buildkit/issues/1512
RUN pip install -r /usr/lib/spark/code/requirements.txt \
&& rm -rf /root/.cache

# Install Huggingface model cache if it is necessary
ARG MODEL=""
ENV HF_HOME=/home/hadoop/.cache/huggingface/hub
RUN if [ -z "${MODEL}" ]; then \
echo "Skip installing model cache"; \
else \
echo "Installing model cache for $MODEL" && \
python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \
python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \
fi


# GSProcessing codebase
COPY code/ /usr/lib/spark/code/

FROM runtime AS prod
RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \
rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache

FROM runtime AS test
RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ && rm -rf /root/.cache

USER hadoop:hadoop
WORKDIR /home/hadoop
55 changes: 55 additions & 0 deletions graphstorm-processing/docker/0.3.0/sagemaker/Dockerfile.cpu
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# syntax=docker/dockerfile:experimental
FROM 153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.5-cpu-py39-v1.0 AS base

# Python won’t try to write .pyc or .pyo files on the import of source modules
# Force stdin, stdout and stderr to be totally unbuffered. Good for logging
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8
ENV LANG=C.UTF-8
ENV LC_ALL=C.UTF-8
ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib"
ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/conda/lib"
ENV PATH=/opt/conda/bin:$PATH
ENV PIP_NO_CACHE_DIR=1

WORKDIR /usr/lib/spark/code/

# Install GSProcessing dependencies to system Python 3.9
COPY requirements.txt requirements.txt
RUN /usr/local/bin/python3.9 -m pip install --no-cache-dir -r /usr/lib/spark/code/requirements.txt \
&& rm -rf /root/.cache

# Graphloader codebase
COPY code/ /usr/lib/spark/code/

# Base container assumes this is the workdir
ENV SPARK_HOME /usr/lib/spark
WORKDIR $SPARK_HOME

# Ensure our python3 installation is the one used
RUN echo 'alias python3=python3.9' >> ~/.bashrc

# Install Huggingface model cache if it is necessary
ARG MODEL=""
ENV HF_HOME=/root/.cache/huggingface/hub
RUN if [ -z "${MODEL}" ]; then \
echo "Skip installing model cache"; \
else \
echo "Installing model cache for $MODEL" && \
python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \
python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \
fi

# Starts framework
ENTRYPOINT ["bash", "/usr/lib/spark/code/docker-entry.sh"]

FROM base AS prod
RUN python3 -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \
rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache
CMD ["gs-processing"]

FROM base AS test
RUN python3 -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ mock pytest && \
rm -rf /root/.cache
CMD ["sh", "-c", "pytest /usr/lib/spark/code/graphstorm-processing/tests/"]
2 changes: 1 addition & 1 deletion graphstorm-processing/docker/push_gsprocessing_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ cleanup() {
# script cleanup here
}

parse_params "$@"
parse_params "${@}"

if [[ ${EXEC_ENV} == "sagemaker" || ${EXEC_ENV} == "emr-serverless" ]]; then
: # Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ExecutorConfig:
Whether to create reverse edges for each edge type.
graph_name: str
The name of the graph being processed
apply_repartition: bool
do_repartition: bool
Whether to apply repartitioning to the graph on the Spark leader.
"""

Expand Down Expand Up @@ -247,7 +247,6 @@ def run(self) -> None:
data_configs = create_config_objects(self.graph_config_dict)

t0 = time.time()
logging.info("Constructing DGLGraph for Heterogeneous Graph")
# Prefer explicit arguments for clarity
loader = DistHeterogeneousGraphLoader(
spark=self.spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,12 +818,16 @@ def process_node_data(self, node_configs: Sequence[NodeConfig]) -> Dict:
self.graph_info["ntype_label"] = []
self.graph_info["ntype_label_property"] = []
for node_config in node_configs:
logging.info("node_config: %s", node_config)
files = node_config.files
file_paths = [f"{self.input_prefix}/{f}" for f in files]

node_type = node_config.ntype
node_col = node_config.node_col
logging.info(
"Processing data for node type %s with config: %s",
node_type,
node_config,
)

read_nodefile_start = perf_counter()
# TODO: Maybe we use same enforced type for Parquet and CSV
Expand Down Expand Up @@ -879,6 +883,7 @@ def process_node_data(self, node_configs: Sequence[NodeConfig]) -> Dict:
NODE_MAPPING_INT
)
else:
logging.info("Creating node str-to-int mapping for node type: %s", node_type)
nodes_df = self.create_node_id_map_from_nodes_df(nodes_df, node_col)
self._write_nodeid_mapping_and_update_state(nodes_df, node_type)

Expand Down Expand Up @@ -949,10 +954,6 @@ def _process_node_features(
node_type_feature_metadata = {}
ntype_feat_sizes = {} # type: Dict[str, int]
for feat_conf in feature_configs:
logging.info(
"Processing feat_name: '%s' feat_cols: %s", feat_conf.feat_name, feat_conf.cols
)

transformer = DistFeatureTransformer(feat_conf)

transformed_feature_df = transformer.apply_transformation(nodes_df)
Expand Down Expand Up @@ -1025,6 +1026,11 @@ def _process_node_labels(
)
self.graph_info["is_multilabel"] = label_conf.multilabel
node_label_loader = DistLabelLoader(label_conf, self.spark)
logging.info(
"Processing label data for node type %s, label col: %s...",
node_type,
label_conf.label_column,
)
transformed_label = node_label_loader.process_label(nodes_df)
self.graph_info["label_map"] = node_label_loader.label_map

Expand All @@ -1044,7 +1050,11 @@ def _process_node_labels(

split_masks_output_prefix = f"{self.output_prefix}/node_data/{node_type}"

logging.info("Creating train/test/val split for node type %s...", node_type)
logging.info(
"Creating train/test/val split for node type %s, label col: %s...",
node_type,
label_conf.label_column,
)
if label_conf.split_rate:
split_rates = SplitRates(
train_rate=label_conf.split_rate["train"],
Expand Down Expand Up @@ -1089,11 +1099,6 @@ def write_edge_structure(
The first list contains the original edge files, the second is the reversed
edge files, will be empty if `self.add_reverse_edges` is False.
"""
# TODO: An option for dealing with skewed data:
# Find the heavy hitter, collect, do broadcast join with just them,
# (for both sides of the edge and filter the rows?) then do the join with
# the rest, then concat the two (or write to storage separately, concat at read-time)

src_col = edge_config.src_col
src_ntype = edge_config.src_ntype
dst_col = edge_config.dst_col
Expand All @@ -1114,10 +1119,13 @@ def write_edge_structure(
)
.withColumnRenamed(NODE_MAPPING_INT, "src_int_id")
.withColumnRenamed(NODE_MAPPING_STR, "src_str_id")
.repartition(self.num_output_files, F.col("src_str_id"))
)

# If edge is homogeneous, we'll re-use the same mapping for both src and dst
if src_ntype == dst_ntype:
src_node_id_mapping.cache()

# Join incoming edge df with mapping df to transform source str-ids to int ids
edge_df = edge_df.repartition(self.num_output_files, F.col(src_col))
edge_df_with_int_src = src_node_id_mapping.join(
edge_df,
src_node_id_mapping["src_str_id"] == edge_df[src_col],
Expand All @@ -1138,22 +1146,24 @@ def write_edge_structure(
intermediate_edge_count,
)

dst_node_id_mapping = (
self.spark.read.parquet(
*[
f"{self.output_prefix}/{dst_path}"
for dst_path in self.node_mapping_paths[dst_ntype]
]
if src_ntype == dst_ntype:
# Re-use mapping for homogeneous edges
dst_node_id_mapping = src_node_id_mapping.withColumnRenamed(
"src_int_id", "dst_int_id"
).withColumnRenamed("src_str_id", "dst_str_id")
else:
dst_node_id_mapping = (
self.spark.read.parquet(
*[
f"{self.output_prefix}/{dst_path}"
for dst_path in self.node_mapping_paths[dst_ntype]
]
)
.withColumnRenamed(NODE_MAPPING_INT, "dst_int_id")
.withColumnRenamed(NODE_MAPPING_STR, "dst_str_id")
)
.withColumnRenamed(NODE_MAPPING_INT, "dst_int_id")
.withColumnRenamed(NODE_MAPPING_STR, "dst_str_id")
.repartition(self.num_output_files, F.col("dst_str_id"))
)
# Join the newly created src-int-id edge df with mapping
# df to transform destination str-ids to int ids
edge_df_with_int_src = edge_df_with_int_src.repartition(
self.num_output_files, F.col(dst_col)
)
edge_df_with_int_ids = dst_node_id_mapping.join(
edge_df_with_int_src,
dst_node_id_mapping["dst_str_id"] == edge_df_with_int_src[dst_col],
Expand All @@ -1173,6 +1183,8 @@ def write_edge_structure(

edge_structure_path = os.path.join(self.output_prefix, f"edges/{edge_type}")
logging.info("Writing edge structure for edge type %s...", edge_type)
if self.add_reverse_edges:
edge_df_with_only_int_ids.cache()
path_list = self._write_df(edge_df_with_only_int_ids, edge_structure_path)

if self.add_reverse_edges:
Expand Down
4 changes: 2 additions & 2 deletions graphstorm-processing/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "graphstorm_processing"
version = "0.2.2"
version = "0.3.0"
description = "Distributed graph pre-processing for GraphStorm"
readme = "README.md"
packages = [{include = "graphstorm_processing"}]
Expand All @@ -10,7 +10,7 @@ authors = [

[tool.poetry.dependencies]
python = "~3.9.12"
pyspark = ">=3.3.0, < 3.5.0"
pyspark = ">=3.3.0, < 3.6.0"
pyarrow = "~14.0.1"
boto3 = "~1.28.1"
joblib = "^1.3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
EC2 instance type for the processing job. (Default: 'ml.r5.4xlarge')
--add-reverse-edges: str
When set to "True", will create reverse edges for every edge type. (Default: "True")
--repartition-on-leader: str
--do-repartition: str
When set to "True", will repartition the graph files on the leader node if needed.
(Default: "True")
--num-output-files: int
Expand Down

0 comments on commit 842b6f5

Please sign in to comment.