Skip to content

Commit

Permalink
Move feature writing function to top-level, share between nodes and e…
Browse files Browse the repository at this point in the history
…dges, add assertions.
  • Loading branch information
thvasilo committed Aug 9, 2024
1 parent a0c7696 commit ae8bf55
Showing 1 changed file with 74 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,42 @@ def process_node_data(self, node_configs: Sequence[NodeConfig]) -> Dict:
logging.info("Finished processing node features")
return node_data_dict

def _write_processed_feature(
self,
feat_name: str,
single_feature_df: DataFrame,
feature_output_path: str,
) -> tuple[dict, int]:

def _get_feat_size(feat_val) -> int:

assert isinstance(
feat_val, (list, numbers.Number)
), f"We expect features to either be scalars or lists of scalars, got {type(feat_val)}."

if isinstance(feat_val, list):
for val in feat_val:
assert isinstance(
val, numbers.Number
), f"We expect feature lists to be lists of scalars, got {type(val)}."

nfeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val)

return nfeat_size

logging.info("Writing output for feat_name: '%s' to %s", feat_name, feature_output_path)
path_list = self._write_df(single_feature_df, feature_output_path, out_format=FORMAT_NAME)

node_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}

feat_val = single_feature_df.take(1)[0].asDict().get(feat_name)
nfeat_size = _get_feat_size(feat_val)

return node_feature_metadata_dict, nfeat_size

def _process_node_features(
self, feature_configs: Sequence[FeatureConfig], nodes_df: DataFrame, node_type: str
) -> Tuple[Dict, Dict]:
Expand Down Expand Up @@ -1063,65 +1099,43 @@ def _process_node_features(
feat_conf.feat_name
] = json_representation

def write_processed_feature(
feat_name: str,
single_feature_df: DataFrame,
node_type: str,
transformer: DistFeatureTransformer,
):
feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)
logging.info(
"Writing output for feat_name: '%s' to %s", feat_name, feature_output_path
)
path_list = self._write_df(
single_feature_df, feature_output_path, out_format=FORMAT_NAME
)

node_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}
node_type_feature_metadata[feat_name] = node_feature_metadata_dict

feat_val = single_feature_df.take(1)[0].asDict().get(feat_name)
assert isinstance(
feat_val, (list, numbers.Number)
), "We expect features to either be scalars or lists of scalars."
nfeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val)
ntype_feat_sizes.update({feat_name: nfeat_size})

self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = (
perf_counter() - node_transformation_start
)

# TODO: Remove hack with [feat_conf.feat_name]
for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols):
node_transformation_start = perf_counter()

if (
feat_conf.feat_type == HUGGINGFACE_TRANFORM
and feat_conf.transformation_kwargs["action"] == HUGGINGFACE_TOKENIZE
):

for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]:
single_feature_df = transformed_feature_df.select(bert_feat_name)
write_processed_feature(
feature_output_path = os.path.join(
self.output_prefix,
f"node_data/{node_type}-{bert_feat_name}",
)
feat_meta, feat_size = self._write_processed_feature(
bert_feat_name,
single_feature_df,
node_type,
transformer,
feature_output_path,
)
node_type_feature_metadata[bert_feat_name] = feat_meta
ntype_feat_sizes.update({bert_feat_name: feat_size})
else:
single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(
feat_col, feat_name
)
write_processed_feature(
feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)
feat_meta, feat_size = self._write_processed_feature(
feat_name,
single_feature_df,
node_type,
transformer,
feature_output_path,
)
node_type_feature_metadata[feat_name] = feat_meta
ntype_feat_sizes.update({feat_name: feat_size})
self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = (
perf_counter() - node_transformation_start
)

# Unpersist and move on to next feature
transformed_feature_df.unpersist()
Expand Down Expand Up @@ -1546,34 +1560,6 @@ def _process_edge_features(
feat_conf.feat_name
] = json_representation

def write_feature(self, feat_name, single_feature_df, edge_type, transformer_name):
feature_output_path = os.path.join(
self.output_prefix, f"edge_data/{edge_type.replace(':', '_')}-{feat_name}"
)
logging.info(
"Writing output for feat_name: '%s' to %s", feat_name, feature_output_path
)
path_list = self._write_df(
single_feature_df, feature_output_path, out_format=FORMAT_NAME
)

edge_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}
edge_feature_metadata_dicts[feat_name] = edge_feature_metadata_dict

feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None)
assert isinstance(
feat_val, (list, numbers.Number)
), "We expect features to either be scalars or lists of scalars."
efeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val)
etype_feat_sizes.update({feat_name: efeat_size})

self.timers[f"{transformer_name}-{edge_type}-{feat_name}"] = (
perf_counter() - edge_feature_start
)

# TODO: Remove hack with [feat_conf.feat_name]
for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols):
edge_feature_start = perf_counter()
Expand All @@ -1584,24 +1570,35 @@ def write_feature(self, feat_name, single_feature_df, edge_type, transformer_nam
):
for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]:
single_feature_df = transformed_feature_df.select(bert_feat_name)
write_feature(
self,
feature_output_path = os.path.join(
self.output_prefix,
f"edge_data/{edge_type}-{bert_feat_name}",
)
feat_meta, feat_size = self._write_processed_feature(
bert_feat_name,
single_feature_df,
edge_type,
transformer.get_transformation_name(),
feature_output_path,
)
edge_type_feature_metadata[bert_feat_name] = feat_meta
etype_feat_sizes.update({bert_feat_name: feat_size})
else:
single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(
feat_col, feat_name
)
write_feature(
self,
feature_output_path = os.path.join(
self.output_prefix, f"edge_data/{edge_type}-{feat_name}"
)
feat_meta, feat_size = self._write_processed_feature(
feat_name,
single_feature_df,
edge_type,
transformer.get_transformation_name(),
feature_output_path,
)
edge_feature_metadata_dicts[feat_name] = feat_meta
etype_feat_sizes.update({feat_name: feat_size})

self.timers[f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = (
perf_counter() - edge_feature_start
)

# Unpersist and move on to next feature
transformed_feature_df.unpersist()
Expand Down

0 comments on commit ae8bf55

Please sign in to comment.