Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing] Move to Spark 3.5, bump version to 0.3.0, add homogeneous edge mapping optimization #791

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions graphstorm-processing/docker/0.3.0/emr-serverless/Dockerfile.cpu
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
ARG ARCH=x86_64
FROM public.ecr.aws/emr-serverless/spark/emr-7.0.0:20240206-${ARCH} as base

USER root
ENV PYTHON_VERSION=3.9.18

# Python won’t try to write .pyc or .pyo files on the import of source modules
# Force stdin, stdout and stderr to be totally unbuffered. Good for logging
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8


FROM base AS arch-x86_64

FROM base AS arch-arm64
RUN yum install -y python3-devel && \
rm -rf /var/cache/yum

FROM arch-${ARCH} AS runtime


WORKDIR /usr/lib/spark/code/

# Install GSProcessing requirements to pyenv Python
COPY requirements.txt requirements.txt
# Use --mount=type=cache,target=/root/.cache when Buildkit CI issue is fixed:
# https://github.com/moby/buildkit/issues/1512
RUN pip install -r /usr/lib/spark/code/requirements.txt \
&& rm -rf /root/.cache

# Install Huggingface model cache if it is necessary
ARG MODEL=""
ENV HF_HOME=/home/hadoop/.cache/huggingface/hub
RUN if [ -z "${MODEL}" ]; then \
echo "Skip installing model cache"; \
else \
echo "Installing model cache for $MODEL" && \
python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \
python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \
fi


# GSProcessing codebase
COPY code/ /usr/lib/spark/code/

FROM runtime AS prod
RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \
rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache

FROM runtime AS test
RUN python -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ && rm -rf /root/.cache

USER hadoop:hadoop
WORKDIR /home/hadoop
55 changes: 55 additions & 0 deletions graphstorm-processing/docker/0.3.0/sagemaker/Dockerfile.cpu
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# syntax=docker/dockerfile:experimental
FROM 153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.5-cpu-py39-v1.0 AS base

# Python won’t try to write .pyc or .pyo files on the import of source modules
# Force stdin, stdout and stderr to be totally unbuffered. Good for logging
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8
ENV LANG=C.UTF-8
ENV LC_ALL=C.UTF-8
ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib"
ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/conda/lib"
ENV PATH=/opt/conda/bin:$PATH
ENV PIP_NO_CACHE_DIR=1

WORKDIR /usr/lib/spark/code/

# Install GSProcessing dependencies to system Python 3.9
COPY requirements.txt requirements.txt
RUN /usr/local/bin/python3.9 -m pip install --no-cache-dir -r /usr/lib/spark/code/requirements.txt \
&& rm -rf /root/.cache

# Graphloader codebase
COPY code/ /usr/lib/spark/code/

# Base container assumes this is the workdir
ENV SPARK_HOME /usr/lib/spark
WORKDIR $SPARK_HOME

# Ensure our python3 installation is the one used
RUN echo 'alias python3=python3.9' >> ~/.bashrc

# Install Huggingface model cache if it is necessary
ARG MODEL=""
ENV HF_HOME=/root/.cache/huggingface/hub
RUN if [ -z "${MODEL}" ]; then \
echo "Skip installing model cache"; \
else \
echo "Installing model cache for $MODEL" && \
python3 -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('${MODEL}')"; \
python3 -c "from transformers import AutoModel; AutoModel.from_pretrained('${MODEL}')"; \
fi

# Starts framework
ENTRYPOINT ["bash", "/usr/lib/spark/code/docker-entry.sh"]

FROM base AS prod
RUN python3 -m pip install --no-deps /usr/lib/spark/code/graphstorm_processing-*.whl && \
rm /usr/lib/spark/code/graphstorm_processing-*.whl && rm -rf /root/.cache
CMD ["gs-processing"]

FROM base AS test
RUN python3 -m pip install --no-deps /usr/lib/spark/code/graphstorm-processing/ mock pytest && \
rm -rf /root/.cache
CMD ["sh", "-c", "pytest /usr/lib/spark/code/graphstorm-processing/tests/"]
2 changes: 1 addition & 1 deletion graphstorm-processing/docker/push_gsprocessing_image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ cleanup() {
# script cleanup here
}

parse_params "$@"
parse_params "${@}"

if [[ ${EXEC_ENV} == "sagemaker" || ${EXEC_ENV} == "emr-serverless" ]]; then
: # Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ExecutorConfig:
Whether to create reverse edges for each edge type.
graph_name: str
The name of the graph being processed
apply_repartition: bool
do_repartition: bool
Whether to apply repartitioning to the graph on the Spark leader.
"""

Expand Down Expand Up @@ -247,7 +247,6 @@ def run(self) -> None:
data_configs = create_config_objects(self.graph_config_dict)

t0 = time.time()
logging.info("Constructing DGLGraph for Heterogeneous Graph")
# Prefer explicit arguments for clarity
loader = DistHeterogeneousGraphLoader(
spark=self.spark,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,12 +818,16 @@ def process_node_data(self, node_configs: Sequence[NodeConfig]) -> Dict:
self.graph_info["ntype_label"] = []
self.graph_info["ntype_label_property"] = []
for node_config in node_configs:
logging.info("node_config: %s", node_config)
files = node_config.files
file_paths = [f"{self.input_prefix}/{f}" for f in files]

node_type = node_config.ntype
node_col = node_config.node_col
logging.info(
"Processing data for node type %s with config: %s",
node_type,
node_config,
)

read_nodefile_start = perf_counter()
# TODO: Maybe we use same enforced type for Parquet and CSV
Expand Down Expand Up @@ -879,6 +883,7 @@ def process_node_data(self, node_configs: Sequence[NodeConfig]) -> Dict:
NODE_MAPPING_INT
)
else:
logging.info("Creating node str-to-int mapping for node type: %s", node_type)
nodes_df = self.create_node_id_map_from_nodes_df(nodes_df, node_col)
self._write_nodeid_mapping_and_update_state(nodes_df, node_type)

Expand Down Expand Up @@ -949,10 +954,6 @@ def _process_node_features(
node_type_feature_metadata = {}
ntype_feat_sizes = {} # type: Dict[str, int]
for feat_conf in feature_configs:
logging.info(
"Processing feat_name: '%s' feat_cols: %s", feat_conf.feat_name, feat_conf.cols
)

transformer = DistFeatureTransformer(feat_conf)

transformed_feature_df = transformer.apply_transformation(nodes_df)
Expand Down Expand Up @@ -1025,6 +1026,11 @@ def _process_node_labels(
)
self.graph_info["is_multilabel"] = label_conf.multilabel
node_label_loader = DistLabelLoader(label_conf, self.spark)
logging.info(
"Processing label data for node type %s, label col: %s...",
node_type,
label_conf.label_column,
)
transformed_label = node_label_loader.process_label(nodes_df)
self.graph_info["label_map"] = node_label_loader.label_map

Expand All @@ -1044,7 +1050,11 @@ def _process_node_labels(

split_masks_output_prefix = f"{self.output_prefix}/node_data/{node_type}"

logging.info("Creating train/test/val split for node type %s...", node_type)
logging.info(
"Creating train/test/val split for node type %s, label col: %s...",
node_type,
label_conf.label_column,
)
if label_conf.split_rate:
split_rates = SplitRates(
train_rate=label_conf.split_rate["train"],
Expand Down Expand Up @@ -1089,11 +1099,6 @@ def write_edge_structure(
The first list contains the original edge files, the second is the reversed
edge files, will be empty if `self.add_reverse_edges` is False.
"""
# TODO: An option for dealing with skewed data:
# Find the heavy hitter, collect, do broadcast join with just them,
# (for both sides of the edge and filter the rows?) then do the join with
# the rest, then concat the two (or write to storage separately, concat at read-time)

src_col = edge_config.src_col
src_ntype = edge_config.src_ntype
dst_col = edge_config.dst_col
Expand All @@ -1114,10 +1119,13 @@ def write_edge_structure(
)
.withColumnRenamed(NODE_MAPPING_INT, "src_int_id")
.withColumnRenamed(NODE_MAPPING_STR, "src_str_id")
.repartition(self.num_output_files, F.col("src_str_id"))
)

# If edge is homogeneous, we'll re-use the same mapping for both src and dst
if src_ntype == dst_ntype:
src_node_id_mapping.cache()
thvasilo marked this conversation as resolved.
Show resolved Hide resolved

# Join incoming edge df with mapping df to transform source str-ids to int ids
edge_df = edge_df.repartition(self.num_output_files, F.col(src_col))
edge_df_with_int_src = src_node_id_mapping.join(
edge_df,
src_node_id_mapping["src_str_id"] == edge_df[src_col],
Expand All @@ -1138,22 +1146,24 @@ def write_edge_structure(
intermediate_edge_count,
)

dst_node_id_mapping = (
self.spark.read.parquet(
*[
f"{self.output_prefix}/{dst_path}"
for dst_path in self.node_mapping_paths[dst_ntype]
]
if src_ntype == dst_ntype:
# Re-use mapping for homogeneous edges
dst_node_id_mapping = src_node_id_mapping.withColumnRenamed(
"src_int_id", "dst_int_id"
).withColumnRenamed("src_str_id", "dst_str_id")
else:
dst_node_id_mapping = (
self.spark.read.parquet(
*[
f"{self.output_prefix}/{dst_path}"
for dst_path in self.node_mapping_paths[dst_ntype]
]
)
.withColumnRenamed(NODE_MAPPING_INT, "dst_int_id")
.withColumnRenamed(NODE_MAPPING_STR, "dst_str_id")
)
.withColumnRenamed(NODE_MAPPING_INT, "dst_int_id")
.withColumnRenamed(NODE_MAPPING_STR, "dst_str_id")
.repartition(self.num_output_files, F.col("dst_str_id"))
)
# Join the newly created src-int-id edge df with mapping
# df to transform destination str-ids to int ids
edge_df_with_int_src = edge_df_with_int_src.repartition(
self.num_output_files, F.col(dst_col)
)
edge_df_with_int_ids = dst_node_id_mapping.join(
edge_df_with_int_src,
dst_node_id_mapping["dst_str_id"] == edge_df_with_int_src[dst_col],
Expand All @@ -1173,6 +1183,8 @@ def write_edge_structure(

edge_structure_path = os.path.join(self.output_prefix, f"edges/{edge_type}")
logging.info("Writing edge structure for edge type %s...", edge_type)
if self.add_reverse_edges:
thvasilo marked this conversation as resolved.
Show resolved Hide resolved
edge_df_with_only_int_ids.cache()
path_list = self._write_df(edge_df_with_only_int_ids, edge_structure_path)

if self.add_reverse_edges:
Expand Down
4 changes: 2 additions & 2 deletions graphstorm-processing/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "graphstorm_processing"
version = "0.2.2"
version = "0.3.0"
description = "Distributed graph pre-processing for GraphStorm"
readme = "README.md"
packages = [{include = "graphstorm_processing"}]
Expand All @@ -10,7 +10,7 @@ authors = [

[tool.poetry.dependencies]
python = "~3.9.12"
pyspark = ">=3.3.0, < 3.5.0"
pyspark = ">=3.3.0, < 3.6.0"
pyarrow = "~14.0.1"
boto3 = "~1.28.1"
joblib = "^1.3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
EC2 instance type for the processing job. (Default: 'ml.r5.4xlarge')
--add-reverse-edges: str
When set to "True", will create reverse edges for every edge type. (Default: "True")
--repartition-on-leader: str
--do-repartition: str
When set to "True", will repartition the graph files on the leader node if needed.
(Default: "True")
--num-output-files: int
Expand Down
Loading