Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GSProcessing] Fix bug in repartition on leader, fix mypy errors and latent bugs #915

Merged
merged 2 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ def __init__(self, config_dict: Dict[str, Any]):
self._label_column = ""
assert config_dict["type"] == "link_prediction"
self._task_type: str = config_dict["type"]
self._separator: str = config_dict["separator"] if "separator" in config_dict else None
self._separator: Optional[str] = (
config_dict["separator"] if "separator" in config_dict else None
)
self._multilabel = self._separator is not None
self._custom_split_filenames: Dict[str, list[str]] = {}
self._split: Dict[str, float] = {}
if "custom_split_filenames" not in config_dict:
self._split: Dict[str, float] = config_dict["split_rate"]
self._custom_split_filenames = None
self._split = config_dict["split_rate"]
else:
self._split = None
self._custom_split_filenames: Dict[str, str] = config_dict["custom_split_filenames"]
self._custom_split_filenames = config_dict["custom_split_filenames"]

def _sanity_check(self):
if self._label_column == "":
Expand Down Expand Up @@ -85,7 +87,7 @@ def multilabel(self) -> bool:
return self._multilabel

@property
def custom_split_filenames(self) -> Dict[str, str]:
def custom_split_filenames(self) -> Dict[str, Any]:
"""The config for custom split labels."""
return self._custom_split_filenames

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def apply(self, input_df: DataFrame) -> DataFrame:
epsilon = bucket_size / 10

# TODO: Test if pyspark.ml.feature.Bucketizer covers our requirements and is faster
def determine_bucket_membership(value: float) -> List[int]:
def determine_bucket_membership(value: float) -> list[float]:
# Create value range, value -> [value - slide/2, value + slide/2]
high_val = value + self.slide_window_size / 2
low_val = value - self.slide_window_size / 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def get_transformation_name() -> str:
return "DistCategoryTransformation"

def apply(self, input_df: DataFrame) -> DataFrame:
assert self.spark
processed_col_names = []
top_categories_per_col: dict[str, list] = {}

Expand Down Expand Up @@ -216,14 +217,16 @@ def create_zeroes_list(vec_size: int):
return [0] * vec_size

transformed_df = None
already_transformed_cols = []
already_transformed_cols: list[str] = []
remaining_cols = list(self.cols)

for col_idx, current_col in enumerate(precomputed_cols):
vector_size = len(labels_arrays[col_idx])
# Mapping from string to one-hot vector,
# with all-zeroes default for unknown/missing values
string_to_vector = defaultdict(partial(create_zeroes_list, vector_size))
string_to_vector: dict[str, list[int]] = defaultdict(
partial(create_zeroes_list, vector_size)
)

string_to_one_hot_idx = per_col_label_to_one_hot_idx[current_col]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def apply_transform(
f"than expected {tokenizer.model_max_length}"
)
# Define the schema of your return type
schema = StructType(
tokenize_schema = StructType(
[
StructField("input_ids", ArrayType(IntegerType())),
StructField("attention_mask", ArrayType(IntegerType())),
Expand All @@ -66,7 +66,7 @@ def apply_transform(
)

# Define UDF
@udf(returnType=schema)
@udf(returnType=tokenize_schema)
def tokenize(text):
# Check if text is a string
if not isinstance(text, str):
Expand Down Expand Up @@ -98,7 +98,7 @@ def tokenize(text):
)
elif action == HUGGINGFACE_EMB:
# Define the schema of your return type
schema = ArrayType(FloatType())
embedding_schema = ArrayType(FloatType())

if th.cuda.is_available():
gpu = (
Expand All @@ -123,7 +123,7 @@ def tokenize(text):
lm_model = lm_model.to(device)

# Define UDF
@udf(returnType=schema)
@udf(returnType=embedding_schema)
def lm_emb(text):
# Check if text is a string
if not isinstance(text, str):
Expand All @@ -139,7 +139,7 @@ def lm_emb(text):
)
token_type_ids = outputs.get("token_type_ids")
if token_type_ids is None:
token_type_ids = torch.zeros_like(outputs["input_ids"], dtype=torch.int8)
token_type_ids = th.zeros_like(outputs["input_ids"], dtype=th.int8)
with th.no_grad():
lm_outputs = lm_model(
input_ids=outputs["input_ids"].to(device),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, cols: Sequence[str], spark: SparkSession) -> None:
self.spark = spark

def apply(self, input_df: DataFrame) -> DataFrame:
assert self.spark
processed_col_name = self.label_column + "_processed"

str_indexer = StringIndexer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def _configure_spark_env_memory(
instance_type = processing_job_config["ProcessingResources"]["ClusterConfig"][
"InstanceType"
].replace("ml.", "")
instance_type_info = instance_type_info[instance_type]
instance_mem_mb = instance_type_info["MemoryInfo"]["SizeInMiB"]
instance_cores = instance_type_info["VCpuInfo"]["DefaultVCpus"]
instance_type_details = instance_type_info[instance_type]
instance_mem_mb = instance_type_details["MemoryInfo"]["SizeInMiB"]
instance_cores = instance_type_details["VCpuInfo"]["DefaultVCpus"]
logging.info(
"Detected instance type: %s with total memory: %d MiB and total cores: %d",
instance_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def run(self) -> None:
)

repartition_start = time.perf_counter()
updated_metadata = {}
if all_match:
logging.info(
"All file row counts match, applying Parquet metadata modification on Spark leader."
Expand All @@ -327,7 +328,7 @@ def run(self) -> None:
try:
# Upload existing output files before trying to re-partition
if self.filesystem_type == FilesystemType.S3:
self._upload_output_files(loader, force=True)
self._upload_output_files(self.loader, force=True)
jalencato marked this conversation as resolved.
Show resolved Hide resolved
updated_metadata = repartition_files(graph_meta_dict, repartitioner)
except Exception as e: # pylint: disable=broad-exception-caught
# If an error happens during re-partition, we don't want to fail the entire
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from time import perf_counter
from typing import Any, Dict, Optional, Set, Tuple

from pyspark import RDD
jalencato marked this conversation as resolved.
Show resolved Hide resolved
from pyspark.sql import Row, SparkSession, DataFrame, functions as F
from pyspark.sql.types import (
StructType,
Expand Down Expand Up @@ -1916,7 +1917,7 @@ def multinomial_sample(label_col: str) -> Sequence[int]:
return train_mask_df, val_mask_df, test_mask_df

def _create_split_files_custom_split(
self, input_df: DataFrame, custom_split_file: str
self, input_df: DataFrame, custom_split_file: CustomSplit
) -> tuple[DataFrame, DataFrame, DataFrame]:
"""
Creates the train/val/test mask dataframe based on custom split files.
Expand All @@ -1925,9 +1926,11 @@ def _create_split_files_custom_split(
----------
input_df: DataFrame
Input dataframe for which we will create split masks.
custom_split_file: Optional[CustomSplit]
custom_split_file: CustomSplit
A CustomSplit object including path to the custom split files for
training/validation/test.
mask_type: str
The type of mask to create, value can be train, val or test.

Returns
-------
Expand All @@ -1937,7 +1940,7 @@ def _create_split_files_custom_split(

# custom node/edge label
# create custom mask dataframe for one of the types: train, val, test
def process_custom_mask_df(input_df, split_file, mask_type):
def process_custom_mask_df(input_df: DataFrame, split_file: CustomSplit, mask_type: str):
if mask_type == "train":
file_path = split_file.train
elif mask_type == "val":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ def _repartition_parquet_files_in_memory(
for idx in range(len(desired_counts))
]
with Parallel(
n_jobs=min(NUM_WRITE_THREADS, os.cpu_count()), verbose=self.verbosity, prefer="threads"
n_jobs=min(NUM_WRITE_THREADS, os.cpu_count() or 16),
jalencato marked this conversation as resolved.
Show resolved Hide resolved
verbose=self.verbosity,
prefer="threads",
) as parallel:
parallel(
delayed(self.write_parquet_to_relative_path)(
Expand Down
6 changes: 6 additions & 0 deletions graphstorm-processing/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ ignore_missing_imports = True

[mypy-smspark.*]
ignore_missing_imports = True

[mypy-transformers.*]
ignore_missing_imports = True

[mypy-scipy.*]
ignore_missing_imports = True
Loading