diff --git a/graphstorm-processing/graphstorm_processing/distributed_executor.py b/graphstorm-processing/graphstorm_processing/distributed_executor.py index e4db63e6a9..332415c00c 100644 --- a/graphstorm-processing/graphstorm_processing/distributed_executor.py +++ b/graphstorm-processing/graphstorm_processing/distributed_executor.py @@ -452,26 +452,33 @@ def append_transformations( """Appends the pre-computed transformations to the input dicts.""" assert structure_type in ["edge", "node"] for input_dict in structure_input_dicts: + # type_name is the name of either a node type or edge type type_name = get_structure_type(input_dict, structure_type) # If we have pre-computed transformations for this type if type_name in structure_transforms: + # type_transforms holds the transformation representations for + # every feature that has one for type_name, from feature name to + # feature representation dict. type_transforms: Mapping[str, Mapping] = structure_transforms[type_name] assert ( "features" in input_dict ), f"Expected type {type_name} to have have features in the input config" - # Iterate over every feature of the type, - # and append representation if one exists + # Iterate over every feature for the node/edge type, + # and append representation to its input dict, if one exists for type_feat_dict in input_dict["features"]: + # We take a feature's name either explicitly if it exists, + # or from the column name otherwise. feat_name = ( type_feat_dict["name"] if "name" in type_feat_dict else type_feat_dict["column"] ) if feat_name in type_transforms: - type_feat_dict["precomputed_transformation"] = type_transforms[ - feat_name - ] + # Feature representation needs to contain all the + # necessary information to re-apply the feature transformation + feature_representation = type_transforms[feat_name] + type_feat_dict["precomputed_transformation"] = feature_representation if edge_transformations: append_transformations(edge_input_dicts, edge_transformations, "edge")