Skip to content

Commit

Permalink
split column
Browse files Browse the repository at this point in the history
  • Loading branch information
jalencato committed Jan 25, 2024
1 parent 1012c25 commit ff21c2d
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
"""
import logging

from pyspark.sql import DataFrame

from pyspark.sql import SparkSession, DataFrame
from graphstorm_processing.config.feature_config_base import FeatureConfig
from .dist_transformations import (
DistributedTransformation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def apply_norm(

# Define the schema of your return type
schema = StructType([
StructField("input_ids", ArrayType(IntegerType())),
StructField("attention_mask", ArrayType(IntegerType())),
StructField("token_type_ids", ArrayType(IntegerType()))
StructField("input_ids", ArrayType(IntegerType()), True),
StructField("attention_mask", ArrayType(IntegerType()), True),
StructField("token_type_ids", ArrayType(IntegerType()), True)
])

# Define UDF
Expand All @@ -73,16 +73,20 @@ def tokenize(text):
# Tokenize the text
t = tokenizer(text, max_length=max_seq_length, truncation=True, padding='max_length', return_tensors='np')
token_type_ids = t.get('token_type_ids', np.zeros_like(t['input_ids'], dtype=np.int8))
result = {
'input_ids': t['input_ids'][0].tolist(), # Convert tensor to list
'attention_mask': t['attention_mask'][0].astype(np.int8).tolist(),
'token_type_ids': token_type_ids[0].astype(np.int8).tolist()
}
result = (
t['input_ids'][0].tolist(), # Convert tensor to list
t['attention_mask'][0].astype(np.int8).tolist(),
token_type_ids[0].astype(np.int8).tolist()
)
return result

# Apply the UDF to the DataFrame
scaled_df = input_df.withColumn(cols[0], tokenize(input_df[cols[0]]))

scaled_df = scaled_df.select(
scaled_df[cols[0]].getItem("input_ids").alias("input_ids"),
scaled_df[cols[0]].getItem("attention_mask").alias("attention_mask"),
scaled_df[cols[0]].getItem("token_type_ids").alias("token_type_ids")
)
return scaled_df


Expand All @@ -106,7 +110,6 @@ def __init__(
self.bert_norm = normalizer
self.bert_model = bert_model
self.max_seq_length = max_seq_length
self.spark = spark

def apply(self, input_df: DataFrame) -> DataFrame:
scaled_df = apply_norm(self.cols, self.bert_norm, self.max_seq_length, input_df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,47 +931,88 @@ def _process_node_features(
logging.info(
"Processing feat_name: '%s' feat_cols: %s", feat_conf.feat_name, feat_conf.cols
)

transformer = DistFeatureTransformer(feat_conf)

transformed_feature_df = transformer.apply_transformation(nodes_df)

# TODO: Remove hack with [feat_conf.feat_name]
for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols):
node_transformation_start = perf_counter()
single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(
feat_col, feat_name
)

feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)
if feat_conf.feat_type == "bert":
input_ids = transformed_feature_df.select("input_ids")
attention_mask = transformed_feature_df.select("attention_mask")
token_type_ids = transformed_feature_df.select("token_type_ids")

logging.info(
"Writing output for feat_name: '%s' to %s", feat_name, feature_output_path
)
path_list = self._write_df(
single_feature_df, feature_output_path, out_format="parquet"
)
for feat_name in ["input_ids", "attention_mask", "token_type_ids"]:
feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)
logging.info(
"Writing output for feat_name: '%s' to %s", feat_name, feature_output_path
)
if feat_name == "input_ids":
write_udf = input_ids
elif feat_name == "attention_mask":
write_udf = attention_mask
elif feat_name == "token_type_ids":
write_udf = token_type_ids
path_list = self._write_df(
write_udf, feature_output_path, out_format="parquet"
)

node_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}
node_type_feature_metadata[feat_name] = node_feature_metadata_dict
node_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}
node_type_feature_metadata[feat_name] = node_feature_metadata_dict

self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = (
perf_counter() - node_transformation_start
)
self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = (
perf_counter() - node_transformation_start
)

feat_val = single_feature_df.take(1)[0].asDict()[feat_name]
feat_val = write_udf.take(1)[0].asDict()[feat_name]

if isinstance(feat_val, (int, float)):
nfeat_size = 1
if isinstance(feat_val, (int, float)):
nfeat_size = 1
else:
nfeat_size = len(feat_val)

ntype_feat_sizes.update({feat_name: nfeat_size})
else:
nfeat_size = len(feat_val)
single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(
feat_col, feat_name
)

ntype_feat_sizes.update({feat_name: nfeat_size})
feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)

logging.info(
"Writing output for feat_name: '%s' to %s", feat_name, feature_output_path
)
path_list = self._write_df(
single_feature_df, feature_output_path, out_format="parquet"
)

node_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}
node_type_feature_metadata[feat_name] = node_feature_metadata_dict

self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = (
perf_counter() - node_transformation_start
)

feat_val = single_feature_df.take(1)[0].asDict()[feat_name]

if isinstance(feat_val, (int, float)):
nfeat_size = 1
else:
nfeat_size = len(feat_val)

ntype_feat_sizes.update({feat_name: nfeat_size})

return node_type_feature_metadata, ntype_feat_sizes

Expand Down Expand Up @@ -1333,7 +1374,7 @@ def _process_edge_features(
feat_conf.feat_name,
edge_type,
)
transformer = DistFeatureTransformer(feat_conf)
transformer = DistFeatureTransformer(feat_conf, self.spark)

transformed_feature_df = transformer.apply_transformation(edges_df)

Expand Down

0 comments on commit ff21c2d

Please sign in to comment.