diff --git a/src/graphnet/models/graphs/utils.py b/src/graphnet/models/graphs/utils.py index ea8445f90..4ba99d1a4 100644 --- a/src/graphnet/models/graphs/utils.py +++ b/src/graphnet/models/graphs/utils.py @@ -1,6 +1,6 @@ """Utility functions for construction of graphs.""" -from typing import List, Tuple +from typing import List, Tuple, Optional, Union import os import numpy as np import pandas as pd @@ -113,6 +113,7 @@ def identify_indices( return cluster_indices, summarization_indices, features_for_summarization +# TODO Remove this function as it is superseded by the class cluster_and_pad wich has the same functionality def cluster_summarize_with_percentiles( x: np.ndarray, summarization_indices: List[int], @@ -149,6 +150,9 @@ def cluster_summarize_with_percentiles( Returns: Percentile-summarized array """ + print( + "This function is deprecated and will be removed, use the class cluster_and_pad with add_percentile_summary instead for the same functionality" + ) pct_dict = {} for feature_idx in summarization_indices: summarized_array, column_offset, counts = gather_cluster_sequence( @@ -172,6 +176,236 @@ def cluster_summarize_with_percentiles( return array +class cluster_and_pad: + """cluster and pad the data for further summarization.""" + + def __init__(self, x: np.ndarray, cluster_columns: List[int]) -> None: + """Initialize the class with the data and cluster columns. + + Args: + x: Array to be clustered + cluster_columns: List of column indices on which the clusters + are constructed. + Returns: None + Adds: + clustered_x: Added to the class + _counts: Added to the class + _padded_x: Added to the class + """ + x = lex_sort(x=x, cluster_columns=cluster_columns) + + unique_sensors, self._counts = np.unique( + x[:, cluster_columns], axis=0, return_counts=True + ) + + contingency_table = np.concatenate( + [unique_sensors, self._counts.reshape(-1, 1)], axis=1 + ) + + contingency_table = lex_sort( + x=contingency_table, cluster_columns=cluster_columns + ) + + self.clustered_x = contingency_table[:, 0 : unique_sensors.shape[1]] + self._counts = ( + contingency_table[:, self.clustered_x.shape[1] :] + .flatten() + .astype(int) + ) + + self._padded_x = np.empty( + (len(self._counts), max(self._counts), x.shape[1]) + ) + self._padded_x.fill(np.nan) + + for i in range(len(self._counts)): + self._padded_x[i, : self._counts[i]] = x[: self._counts[i]] + x = x[self._counts[i] :] + + def _add_column( + self, column: np.ndarray, location: Optional[int] = None + ) -> None: + """Add a column to the clustered tensor. + + Args: + column: Column to be added to the tensor + location: Location to insert the column in the clustered tensor + Returns: + clustered_x: The clustered tensor with the column added + """ + if location is None: + self.clustered_x = np.column_stack([self.clustered_x, column]) + else: + self.clustered_x = np.insert( + self.clustered_x, location, column, axis=1 + ) + + def add_charge_threshold_summary( + self, + summarization_indices: List[int], + percentiles: List[int], + charge_index: int, + location: Optional[int] = None, + ) -> np.ndarray: + """Summarize features through percentiles on charge of sensor. + + Args: + summarization_indices: List of column indices that defines features + that will be summarized with percentiles. + percentiles: percentiles used to summarize `x`. E.g. [10,50,90]. + charge_index: index of the charge column in the padded tensor + location: Location to insert the summarization indices in the clustered tensor defaults to adding at the end + Returns: + clustered_x: The clustered tensor with the summarization indices added + Adds: + _charge_sum: Added to the class + _charge_weights: Added to the class + Altered: + _padded_x: Charge is altered to be the cumulative sum + of the charge divided by the total charge + clustered_x: The summarization indices are added at the end of the tensor + """ + # convert the charge to the cumulative sum of the charge divided by the total charge + self._charge_weights = self._padded_x[:, :, charge_index] + + self._padded_x[:, :, charge_index] = self._padded_x[ + :, :, charge_index + ].cumsum(axis=1) + + # add the charge sum to the class if it does not already exist + if not hasattr(self, "_charge_sum"): + self._charge_sum = np.nanmax( + self._padded_x[:, :, charge_index], axis=1 + ) + + self._charge_weights = ( + self._charge_weights / self._charge_sum[:, np.newaxis] + ) + + self._padded_x[:, :, charge_index] = ( + self._padded_x[:, :, charge_index] + / self._charge_sum[:, np.newaxis] + ) + + # Summarize the charge at different percentiles + selections = np.argmax( + self._padded_x[:, :, charge_index][:, :, np.newaxis] + >= (np.array(percentiles) / 100), + axis=1, + ) + + selections += (np.arange(len(self._counts)) * self._padded_x.shape[1])[ + :, np.newaxis + ] + + selections = self._padded_x[:, :, summarization_indices].reshape( + -1, len(summarization_indices) + )[selections] + selections = selections.transpose(0, 2, 1).reshape( + len(self.clustered_x), -1 + ) + self._add_column(selections, location) + return self.clustered_x + + def add_percentile_summary( + self, + summarization_indices: List[int], + percentiles: List[int], + method: str = "linear", + location: Optional[int] = None, + ) -> np.ndarray: + """Summarize the features of the sensors using percentiles. + + Args: + summarization_indices: List of column indices that defines features + that will be summarized with percentiles. + percentiles: percentiles used to summarize `x`. E.g. [10,50,90]. + method: Method to summarize the features. E.g. "linear" + location: Location to insert the summarization indices in the clustered tensor defaults to adding at the end + Returns: + None + Adds: + None + Altered: + clustered_x: The summarization indices are added at the end of the tensor + """ + percentiles_x = np.nanpercentile( + self._padded_x[:, :, summarization_indices], + percentiles, + axis=1, + method=method, + ) + + percentiles_x = percentiles_x.transpose(1, 2, 0).reshape( + len(self.clustered_x), -1 + ) + self._add_column(percentiles_x, location) + return self.clustered_x + + def add_counts(self, location: int) -> np.ndarray: + """Add the counts of the sensor to the summarization features.""" + self._add_column(np.log10(self._counts), location) + return self.clustered_x + + def calculate_charge_sum(self, charge_index: int) -> np.ndarray: + """Calculate the sum of the charge.""" + assert not hasattr( + self, "_charge_sum" + ), "Charge sum has already been calculated, re-calculation is not allowed" + self._charge_sum = self._padded_x[:, :, charge_index].sum(axis=1) + return self._charge_sum + + def calculate_charge_weights(self, charge_index: int) -> np.ndarray: + """Calculate the weights of the charge.""" + assert not hasattr( + self, "_charge_weights" + ), "Charge weights have already been calculated, re-calculation is not allowed" + assert hasattr( + self, "_charge_sum" + ), "Charge sum has not been calculated, please run calculate_charge_sum" + self._charge_weights = ( + self._padded_x[:, :, charge_index] + / self._charge_sum[:, np.newaxis] + ) + return self._charge_weights + + def add_sum_charge(self, location: int) -> np.ndarray: + """Add the sum of the charge to the summarization features.""" + assert hasattr( + self, "_charge_sum" + ), "Charge sum has not been calculated, please run calculate_charge_sum" + self._add_column(self._charge_sum, location) + return self.clustered_x + + def add_std( + self, + column: int, + location: Optional[int] = None, + weights: Union[np.ndarray, int] = 1, + ) -> np.ndarray: + """Add the standard deviation of the column. + + Args: + column: Index of the column in the padded tensor to calculate the standard deviation + location: Location to insert the standard deviation in the clustered tensor defaults to adding at the end + weights: Optional weights to be applied to the standard deviation + """ + self._add_column( + np.nanstd(self._padded_x[:, :, column] * weights, axis=1), location + ) + return self.clustered_x + + def add_mean( + self, column: int, location: int, weights: Union[np.ndarray, int] = 1 + ) -> np.ndarray: + """Add the mean of the column.""" + self._add_column( + np.nanmean(self._padded_x[:, :, column] * weights, axis=1), + location, + ) + return self.clustered_x + + def ice_transparency( z_offset: float = None, z_scaling: float = None ) -> Tuple[interp1d, interp1d]: