diff --git a/src/graphnet/models/graphs/nodes/nodes.py b/src/graphnet/models/graphs/nodes/nodes.py index e8f8d749d..a8e4e123a 100644 --- a/src/graphnet/models/graphs/nodes/nodes.py +++ b/src/graphnet/models/graphs/nodes/nodes.py @@ -9,7 +9,7 @@ from graphnet.utilities.decorators import final from graphnet.models import Model from graphnet.models.graphs.utils import ( - cluster_summarize_with_percentiles, + cluster_and_pad, identify_indices, lex_sort, ice_transparency, @@ -169,9 +169,7 @@ def _define_output_feature_names( cluster_idx, summ_idx, new_feature_names, - ) = self._get_indices_and_feature_names( - input_feature_names, self._add_counts - ) + ) = self._get_indices_and_feature_names(input_feature_names) self._cluster_indices = cluster_idx self._summarization_indices = summ_idx return new_feature_names @@ -179,7 +177,6 @@ def _define_output_feature_names( def _get_indices_and_feature_names( self, feature_names: List[str], - add_counts: bool, ) -> Tuple[List[int], List[int], List[str]]: cluster_idx, summ_idx, summ_names = identify_indices( feature_names, self._cluster_on @@ -188,7 +185,7 @@ def _get_indices_and_feature_names( for feature in summ_names: for pct in self._percentiles: new_feature_names.append(f"{feature}_pct{pct}") - if add_counts: + if self._add_counts: # add "counts" as the last feature new_feature_names.append("counts") return cluster_idx, summ_idx, new_feature_names @@ -198,13 +195,16 @@ def _construct_nodes(self, x: torch.Tensor) -> Data: x = x.numpy() # Construct clusters with percentile-summarized features if hasattr(self, "_summarization_indices"): - array = cluster_summarize_with_percentiles( - x=x, + cluster_class = cluster_and_pad( + x=x, cluster_columns=self._cluster_indices + ) + cluster_class.add_percentile_summary( summarization_indices=self._summarization_indices, - cluster_indices=self._cluster_indices, percentiles=self._percentiles, - add_counts=self._add_counts, ) + if self._add_counts: + cluster_class.add_counts() + array = cluster_class.clustered_x else: self.error( f"""{self.__class__.__name__} was not instatiated with diff --git a/src/graphnet/models/graphs/utils.py b/src/graphnet/models/graphs/utils.py index 77669eaeb..7bb746508 100644 --- a/src/graphnet/models/graphs/utils.py +++ b/src/graphnet/models/graphs/utils.py @@ -1,6 +1,6 @@ """Utility functions for construction of graphs.""" -from typing import List, Tuple, Optional +from typing import List, Tuple, Optional, Union import os import numpy as np import pandas as pd @@ -113,6 +113,8 @@ def identify_indices( return cluster_indices, summarization_indices, features_for_summarization +# TODO Remove this function as it is superseded by +# cluster_and_pad wich has the same functionality def cluster_summarize_with_percentiles( x: np.ndarray, summarization_indices: List[int], @@ -172,6 +174,319 @@ def cluster_summarize_with_percentiles( return array +class cluster_and_pad: + """Cluster and pad the data for further summarization. + + Clusters the inptut data according to the specified columns + and computes aggregate statistics on the clusters. + The clustering will happen only ones creating a cluster matrix + which will hold all the aggregated statistics and a padded matrix which + will hold the padded data for quick calculation of aggregate statistics. + + Example: + cluster_and_pad(x = single_event_as_array, + cluster_columns = [0,1,2]) + # Creates a cluster matrix and a padded matrix, + # the cluster matrix will contain the unique values of the cluster columns, + # no additional aggregate statistics are added yet. + + cluster_class.add_percentile_summary(summarization_indices = [3,4,5], + percentiles = [10,50,90]) + # Adds the 10th, 50th and 90th percentile of columns 3,4 + # and 5 in the input data to the cluster matrix. + + cluster_class.add_std(column = 4) + # Adds the standard deviation of column 4 in the input data + # to the cluster matrix. + x = cluster_class.clustered_x + # Gets the clustered matrix with all the aggregate statistics. + """ + + def __init__( + self, + x: np.ndarray, + cluster_columns: List[int], + input_names: Optional[List[str]] = None, + ) -> None: + """Initialize the class with the data and cluster columns. + + Args: + x: Array to be clustered + cluster_columns: List of column indices on which the clusters + are constructed. + input_names: Names of the columns in the input data for automatic + generation of names. + Adds: + clustered_x: Added to the class + _counts: Added to the class + _padded_x: Added to the class + """ + x = lex_sort(x=x, cluster_columns=cluster_columns) + + unique_sensors, self._counts = np.unique( + x[:, cluster_columns], axis=0, return_counts=True + ) + + contingency_table = np.concatenate( + [unique_sensors, self._counts.reshape(-1, 1)], axis=1 + ) + + contingency_table = lex_sort( + x=contingency_table, cluster_columns=cluster_columns + ) + + self.clustered_x = contingency_table[:, 0 : unique_sensors.shape[1]] + self._counts = ( + contingency_table[:, self.clustered_x.shape[1] :] + .flatten() + .astype(int) + ) + + self._padded_x = np.empty( + (len(self._counts), max(self._counts), x.shape[1]) + ) + self._padded_x.fill(np.nan) + + for i in range(len(self._counts)): + self._padded_x[i, : self._counts[i]] = x[: self._counts[i]] + x = x[self._counts[i] :] + + self._input_names = input_names + if self._input_names is not None: + assert ( + len(self._input_names) == x.shape[1] + ), "The input names must have the same length as the input data" + + self._cluster_names = np.array(input_names)[cluster_columns] + + def _add_column( + self, column: np.ndarray, location: Optional[int] = None + ) -> None: + """Add a column to the clustered tensor. + + Args: + column: Column to be added to the tensor + location: Location to insert the column in the clustered tensor. + Altered: + clustered_x: The column is added at the end of the tenor or + inserted at the specified location + """ + if location is None: + self.clustered_x = np.column_stack([self.clustered_x, column]) + else: + self.clustered_x = np.insert( + self.clustered_x, location, column, axis=1 + ) + + def _add_column_names( + self, names: List[str], location: Optional[int] = None + ) -> None: + """Add names to the columns of the clustered tensor. + + Args: + names: Names to be added to the columns of the tensor + location: Location to insert the names in the clustered tensor + Altered: + _cluster_names: The names are added at the end of the tensor + or inserted at the specified location + """ + if location is None: + self._cluster_names = np.append(self._cluster_names, names) + else: + self._cluster_names = np.insert( + self._cluster_names, location, names + ) + + def _calculate_charge_sum(self, charge_index: int) -> np.ndarray: + """Calculate the sum of the charge.""" + assert not hasattr( + self, "_charge_sum" + ), "Charge sum has already been calculated, \ + re-calculation is not allowed" + self._charge_sum = self._padded_x[:, :, charge_index].sum(axis=1) + + def _calculate_charge_weights(self, charge_index: int) -> np.ndarray: + """Calculate the weights of the charge.""" + assert not hasattr( + self, "_charge_weights" + ), "Charge weights have already been calculated, \ + re-calculation is not allowed" + assert hasattr( + self, "_charge_sum" + ), "Charge sum has not been calculated, \ + please run calculate_charge_sum" + self._charge_weights = ( + self._padded_x[:, :, charge_index] + / self._charge_sum[:, np.newaxis] + ) + + def add_charge_threshold_summary( + self, + summarization_indices: List[int], + percentiles: List[int], + charge_index: int, + location: Optional[int] = None, + ) -> np.ndarray: + """Summarize features through percentiles on charge of sensor. + + Args: + summarization_indices: List of column indices that defines features + that will be summarized with percentiles. + percentiles: percentiles used to summarize `x`. E.g. [10,50,90]. + charge_index: index of the charge column in the padded tensor + location: Location to insert the summarization indices in the + clustered tensor defaults to adding at the end + Adds: + _charge_sum: Added to the class + _charge_weights: Added to the class + Altered: + _padded_x: Charge is altered to be the cumulative sum + of the charge divided by the total charge + clustered_x: The summarization indices are added at the end + of the tensor or inserted at the specified location. + _cluster_names: The names are added at the end of the tensor + or inserted at the specified location + """ + # convert the charge to the cumulative sum of the charge divided + # by the total charge + self._calculate_charge_sum(charge_index) + self._calculate_charge_weights(charge_index) + + self._padded_x[:, :, charge_index] = ( + self._padded_x[:, :, charge_index] + / self._charge_sum[:, np.newaxis] + ) + + # Summarize the charge at different percentiles + selections = np.argmax( + self._padded_x[:, :, charge_index][:, :, np.newaxis] + >= (np.array(percentiles) / 100), + axis=1, + ) + + selections += (np.arange(len(self._counts)) * self._padded_x.shape[1])[ + :, np.newaxis + ] + + selections = self._padded_x[:, :, summarization_indices].reshape( + -1, len(summarization_indices) + )[selections] + selections = selections.transpose(0, 2, 1).reshape( + len(self.clustered_x), -1 + ) + self._add_column(selections, location) + + # update the cluster names + if self._input_names is not None: + new_names = [ + self._input_names[i] + "_charge_threshold_" + str(p) + for i in summarization_indices + for p in percentiles + ] + self._add_column_names(new_names, location) + + def add_percentile_summary( + self, + summarization_indices: List[int], + percentiles: List[int], + method: str = "linear", + location: Optional[int] = None, + ) -> np.ndarray: + """Summarize the features of the sensors using percentiles. + + Args: + summarization_indices: List of column indices that defines features + that will be summarized with percentiles. + percentiles: percentiles used to summarize `x`. E.g. [10,50,90]. + method: Method to summarize the features. E.g. "linear" + location: Location to insert the summarization indices in the + clustered tensor defaults to adding at the end + Altered: + clustered_x: The summarization indices are added at the end of + the tensor or inserted at the specified location + _cluster_names: The names are added at the end of the tensor + or inserted at the specified location + """ + percentiles_x = np.nanpercentile( + self._padded_x[:, :, summarization_indices], + percentiles, + axis=1, + method=method, + ) + + percentiles_x = percentiles_x.transpose(1, 2, 0).reshape( + len(self.clustered_x), -1 + ) + self._add_column(percentiles_x, location) + + # update the cluster names + if self._input_names is not None: + new_names = [ + self._input_names[i] + "_percentile_" + str(p) + for i in summarization_indices + for p in percentiles + ] + self._add_column_names(new_names, location) + + def add_counts(self, location: Optional[int] = None) -> np.ndarray: + """Add the counts of the sensor to the summarization features.""" + self._add_column(np.log10(self._counts), location) + if self._input_names is not None: + new_name = ["counts"] + self._add_column_names(new_name, location) + + def add_sum_charge( + self, charge_index: int, location: Optional[int] = None + ) -> np.ndarray: + """Add the sum of the charge to the summarization features.""" + if not hasattr(self, "_charge_sum"): + self._calculate_charge_sum(charge_index) + self._add_column(self._charge_sum, location) + # update the cluster names + if self._input_names is not None: + new_name = [self._input_names[charge_index] + "_sum"] + self._add_column_names(new_name, location) + + def add_std( + self, + columns: List[int], + location: Optional[int] = None, + weights: Union[np.ndarray, int] = 1, + ) -> np.ndarray: + """Add the standard deviation of the column. + + Args: + columns: Index of the columns from which to calculate the standard + deviation. + location: Location to insert the standard deviation in the + clustered tensor defaults to adding at the end + weights: Optional weights to be applied to the standard deviation + """ + self._add_column( + np.nanstd(self._padded_x[:, :, columns] * weights, axis=1), + location, + ) + if self._input_names is not None: + new_names = [self._input_names[i] + "_std" for i in columns] + self._add_column_names(new_names, location) + + def add_mean( + self, + columns: List[int], + location: Optional[int] = None, + weights: Union[np.ndarray, int] = 1, + ) -> np.ndarray: + """Add the mean of the column.""" + self._add_column( + np.nanmean(self._padded_x[:, :, columns] * weights, axis=1), + location, + ) + # update the cluster names + if self._input_names is not None: + new_names = [self._input_names[i] + "_mean" for i in columns] + self._add_column_names(new_names, location) + + def ice_transparency( z_offset: Optional[float] = None, z_scaling: Optional[float] = None ) -> Tuple[interp1d, interp1d]: