Skip to content

Commit

Permalink
[GSProcessing] Fix isinstance check for numerical features (#955)
Browse files Browse the repository at this point in the history
*Issue #, if available:*

*Description of changes:*

* Fixes corner case where a feature value might not be one of int/float
(we don't cast Parquet input types), replacing with the more general
`numbers.Number`.

Example of how to trigger a failure, as observed by a customer:

```python
>>> import numbers
>>> import decimal
>>> dn = decimal.Decimal(5)
>>> intnum = 5
>>> isinstance(intnum, (float, int))
True
>>> isinstance(dn, (float, int))
False
>>> isinstance(dn, numbers.Number)
True
```


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
thvasilo authored Aug 10, 2024
1 parent 9bafcdf commit fbad0c7
Showing 1 changed file with 74 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,42 @@ def process_node_data(self, node_configs: Sequence[NodeConfig]) -> Dict:
logging.info("Finished processing node features")
return node_data_dict

def _write_processed_feature(
self,
feat_name: str,
single_feature_df: DataFrame,
feature_output_path: str,
) -> tuple[dict, int]:

def _get_feat_size(feat_val) -> int:

assert isinstance(
feat_val, (list, numbers.Number)
), f"We expect features to either be scalars or lists of scalars, got {type(feat_val)}."

if isinstance(feat_val, list):
for val in feat_val:
assert isinstance(
val, numbers.Number
), f"We expect feature lists to be lists of scalars, got {type(val)}."

nfeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val)

return nfeat_size

logging.info("Writing output for feat_name: '%s' to %s", feat_name, feature_output_path)
path_list = self._write_df(single_feature_df, feature_output_path, out_format=FORMAT_NAME)

node_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}

feat_val = single_feature_df.take(1)[0].asDict().get(feat_name)
nfeat_size = _get_feat_size(feat_val)

return node_feature_metadata_dict, nfeat_size

def _process_node_features(
self, feature_configs: Sequence[FeatureConfig], nodes_df: DataFrame, node_type: str
) -> Tuple[Dict, Dict]:
Expand Down Expand Up @@ -1063,62 +1099,43 @@ def _process_node_features(
feat_conf.feat_name
] = json_representation

def write_processed_feature(
feat_name: str,
single_feature_df: DataFrame,
node_type: str,
transformer: DistFeatureTransformer,
):
feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)
logging.info(
"Writing output for feat_name: '%s' to %s", feat_name, feature_output_path
)
path_list = self._write_df(
single_feature_df, feature_output_path, out_format=FORMAT_NAME
)

node_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}
node_type_feature_metadata[feat_name] = node_feature_metadata_dict

feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None)
nfeat_size = 1 if isinstance(feat_val, (int, float)) else len(feat_val)
ntype_feat_sizes.update({feat_name: nfeat_size})

self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = (
perf_counter() - node_transformation_start
)

# TODO: Remove hack with [feat_conf.feat_name]
for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols):
node_transformation_start = perf_counter()

if (
feat_conf.feat_type == HUGGINGFACE_TRANFORM
and feat_conf.transformation_kwargs["action"] == HUGGINGFACE_TOKENIZE
):

for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]:
single_feature_df = transformed_feature_df.select(bert_feat_name)
write_processed_feature(
feature_output_path = os.path.join(
self.output_prefix,
f"node_data/{node_type}-{bert_feat_name}",
)
feat_meta, feat_size = self._write_processed_feature(
bert_feat_name,
single_feature_df,
node_type,
transformer,
feature_output_path,
)
node_type_feature_metadata[bert_feat_name] = feat_meta
ntype_feat_sizes.update({bert_feat_name: feat_size})
else:
single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(
feat_col, feat_name
)
write_processed_feature(
feature_output_path = os.path.join(
self.output_prefix, f"node_data/{node_type}-{feat_name}"
)
feat_meta, feat_size = self._write_processed_feature(
feat_name,
single_feature_df,
node_type,
transformer,
feature_output_path,
)
node_type_feature_metadata[feat_name] = feat_meta
ntype_feat_sizes.update({feat_name: feat_size})
self.timers[f"{transformer.get_transformation_name()}-{node_type}-{feat_name}"] = (
perf_counter() - node_transformation_start
)

# Unpersist and move on to next feature
transformed_feature_df.unpersist()
Expand Down Expand Up @@ -1543,31 +1560,6 @@ def _process_edge_features(
feat_conf.feat_name
] = json_representation

def write_feature(self, feat_name, single_feature_df, edge_type, transformer_name):
feature_output_path = os.path.join(
self.output_prefix, f"edge_data/{edge_type.replace(':', '_')}-{feat_name}"
)
logging.info(
"Writing output for feat_name: '%s' to %s", feat_name, feature_output_path
)
path_list = self._write_df(
single_feature_df, feature_output_path, out_format=FORMAT_NAME
)

edge_feature_metadata_dict = {
"format": {"name": FORMAT_NAME, "delimiter": DELIMITER},
"data": path_list,
}
edge_feature_metadata_dicts[feat_name] = edge_feature_metadata_dict

feat_val = single_feature_df.take(1)[0].asDict().get(feat_name, None)
efeat_size = 1 if isinstance(feat_val, numbers.Number) else len(feat_val)
etype_feat_sizes.update({feat_name: efeat_size})

self.timers[f"{transformer_name}-{edge_type}-{feat_name}"] = (
perf_counter() - edge_feature_start
)

# TODO: Remove hack with [feat_conf.feat_name]
for feat_name, feat_col in zip([feat_conf.feat_name], feat_conf.cols):
edge_feature_start = perf_counter()
Expand All @@ -1578,24 +1570,35 @@ def write_feature(self, feat_name, single_feature_df, edge_type, transformer_nam
):
for bert_feat_name in ["input_ids", "attention_mask", "token_type_ids"]:
single_feature_df = transformed_feature_df.select(bert_feat_name)
write_feature(
self,
feature_output_path = os.path.join(
self.output_prefix,
f"edge_data/{edge_type}-{bert_feat_name}",
)
feat_meta, feat_size = self._write_processed_feature(
bert_feat_name,
single_feature_df,
edge_type,
transformer.get_transformation_name(),
feature_output_path,
)
edge_type_feature_metadata[bert_feat_name] = feat_meta
etype_feat_sizes.update({bert_feat_name: feat_size})
else:
single_feature_df = transformed_feature_df.select(feat_col).withColumnRenamed(
feat_col, feat_name
)
write_feature(
self,
feature_output_path = os.path.join(
self.output_prefix, f"edge_data/{edge_type}-{feat_name}"
)
feat_meta, feat_size = self._write_processed_feature(
feat_name,
single_feature_df,
edge_type,
transformer.get_transformation_name(),
feature_output_path,
)
edge_feature_metadata_dicts[feat_name] = feat_meta
etype_feat_sizes.update({feat_name: feat_size})

self.timers[f"{transformer.get_transformation_name()}-{edge_type}-{feat_name}"] = (
perf_counter() - edge_feature_start
)

# Unpersist and move on to next feature
transformed_feature_df.unpersist()
Expand Down

0 comments on commit fbad0c7

Please sign in to comment.