Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustering utilities #770

Merged
merged 19 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/graphnet/models/graphs/nodes/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from graphnet.utilities.decorators import final
from graphnet.models import Model
from graphnet.models.graphs.utils import (
cluster_summarize_with_percentiles,
cluster_and_pad,
identify_indices,
lex_sort,
ice_transparency,
Expand Down Expand Up @@ -169,17 +169,14 @@ def _define_output_feature_names(
cluster_idx,
summ_idx,
new_feature_names,
) = self._get_indices_and_feature_names(
input_feature_names, self._add_counts
)
) = self._get_indices_and_feature_names(input_feature_names)
self._cluster_indices = cluster_idx
self._summarization_indices = summ_idx
return new_feature_names

def _get_indices_and_feature_names(
self,
feature_names: List[str],
add_counts: bool,
) -> Tuple[List[int], List[int], List[str]]:
cluster_idx, summ_idx, summ_names = identify_indices(
feature_names, self._cluster_on
Expand All @@ -188,7 +185,7 @@ def _get_indices_and_feature_names(
for feature in summ_names:
for pct in self._percentiles:
new_feature_names.append(f"{feature}_pct{pct}")
if add_counts:
if self._add_counts:
# add "counts" as the last feature
new_feature_names.append("counts")
return cluster_idx, summ_idx, new_feature_names
Expand All @@ -198,13 +195,16 @@ def _construct_nodes(self, x: torch.Tensor) -> Data:
x = x.numpy()
# Construct clusters with percentile-summarized features
if hasattr(self, "_summarization_indices"):
array = cluster_summarize_with_percentiles(
x=x,
cluster_class = cluster_and_pad(
x=x, cluster_columns=self._cluster_indices
)
cluster_class.add_percentile_summary(
summarization_indices=self._summarization_indices,
cluster_indices=self._cluster_indices,
percentiles=self._percentiles,
add_counts=self._add_counts,
)
if self._add_counts:
cluster_class.add_counts()
array = cluster_class.clustered_x
else:
self.error(
f"""{self.__class__.__name__} was not instatiated with
Expand Down
317 changes: 316 additions & 1 deletion src/graphnet/models/graphs/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Utility functions for construction of graphs."""

from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, Union
import os
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -113,6 +113,8 @@ def identify_indices(
return cluster_indices, summarization_indices, features_for_summarization


# TODO Remove this function as it is superseded by
# cluster_and_pad wich has the same functionality
def cluster_summarize_with_percentiles(
x: np.ndarray,
summarization_indices: List[int],
Expand Down Expand Up @@ -172,6 +174,319 @@ def cluster_summarize_with_percentiles(
return array


class cluster_and_pad:
"""Cluster and pad the data for further summarization.

Clusters the inptut data according to the specified columns
and computes aggregate statistics on the clusters.
The clustering will happen only ones creating a cluster matrix
which will hold all the aggregated statistics and a padded matrix which
will hold the padded data for quick calculation of aggregate statistics.

Example:
cluster_and_pad(x = single_event_as_array,
cluster_columns = [0,1,2])
# Creates a cluster matrix and a padded matrix,
# the cluster matrix will contain the unique values of the cluster columns,
# no additional aggregate statistics are added yet.

cluster_class.add_percentile_summary(summarization_indices = [3,4,5],
percentiles = [10,50,90])
# Adds the 10th, 50th and 90th percentile of columns 3,4
# and 5 in the input data to the cluster matrix.

cluster_class.add_std(column = 4)
# Adds the standard deviation of column 4 in the input data
# to the cluster matrix.
x = cluster_class.clustered_x
# Gets the clustered matrix with all the aggregate statistics.
"""

def __init__(
self,
x: np.ndarray,
cluster_columns: List[int],
input_names: Optional[List[str]] = None,
) -> None:
"""Initialize the class with the data and cluster columns.

Args:
x: Array to be clustered
cluster_columns: List of column indices on which the clusters
are constructed.
input_names: Names of the columns in the input data for automatic
generation of names.
Adds:
clustered_x: Added to the class
_counts: Added to the class
_padded_x: Added to the class
"""
x = lex_sort(x=x, cluster_columns=cluster_columns)

unique_sensors, self._counts = np.unique(
x[:, cluster_columns], axis=0, return_counts=True
)

contingency_table = np.concatenate(
[unique_sensors, self._counts.reshape(-1, 1)], axis=1
)

contingency_table = lex_sort(
x=contingency_table, cluster_columns=cluster_columns
)

self.clustered_x = contingency_table[:, 0 : unique_sensors.shape[1]]
self._counts = (
contingency_table[:, self.clustered_x.shape[1] :]
.flatten()
.astype(int)
)

self._padded_x = np.empty(
(len(self._counts), max(self._counts), x.shape[1])
)
self._padded_x.fill(np.nan)

for i in range(len(self._counts)):
self._padded_x[i, : self._counts[i]] = x[: self._counts[i]]
x = x[self._counts[i] :]

self._input_names = input_names
if self._input_names is not None:
assert (
len(self._input_names) == x.shape[1]
), "The input names must have the same length as the input data"

self._cluster_names = np.array(input_names)[cluster_columns]

def _add_column(
self, column: np.ndarray, location: Optional[int] = None
) -> None:
"""Add a column to the clustered tensor.

Args:
column: Column to be added to the tensor
location: Location to insert the column in the clustered tensor.
Altered:
clustered_x: The column is added at the end of the tenor or
inserted at the specified location
"""
if location is None:
self.clustered_x = np.column_stack([self.clustered_x, column])
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The location parameter could potentially overwrite information, right? E.g. if column 6 already contains some aggregate statistics and the user specifies location=6. Maybe a warning/error here would be useful?

Copy link
Collaborator Author

@Aske-Rosted Aske-Rosted Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The location feature is added using insert which does not remove information but instead expands the vector. The only possible confusion that could come from this is that the final ordering of the features might not be as the user expects as since every feature after the insert will have their location incremented by 1.

self.clustered_x = np.insert(
self.clustered_x, location, column, axis=1
)

def _add_column_names(
self, names: List[str], location: Optional[int] = None
) -> None:
"""Add names to the columns of the clustered tensor.

Args:
names: Names to be added to the columns of the tensor
location: Location to insert the names in the clustered tensor
Altered:
_cluster_names: The names are added at the end of the tensor
or inserted at the specified location
"""
if location is None:
self._cluster_names = np.append(self._cluster_names, names)
else:
self._cluster_names = np.insert(
self._cluster_names, location, names
)

def _calculate_charge_sum(self, charge_index: int) -> np.ndarray:
"""Calculate the sum of the charge."""
assert not hasattr(
self, "_charge_sum"
), "Charge sum has already been calculated, \
re-calculation is not allowed"
self._charge_sum = self._padded_x[:, :, charge_index].sum(axis=1)

def _calculate_charge_weights(self, charge_index: int) -> np.ndarray:
"""Calculate the weights of the charge."""
assert not hasattr(
self, "_charge_weights"
), "Charge weights have already been calculated, \
re-calculation is not allowed"
assert hasattr(
self, "_charge_sum"
), "Charge sum has not been calculated, \
please run calculate_charge_sum"
self._charge_weights = (
self._padded_x[:, :, charge_index]
/ self._charge_sum[:, np.newaxis]
)

def add_charge_threshold_summary(
self,
summarization_indices: List[int],
percentiles: List[int],
charge_index: int,
location: Optional[int] = None,
) -> np.ndarray:
"""Summarize features through percentiles on charge of sensor.

Args:
summarization_indices: List of column indices that defines features
that will be summarized with percentiles.
percentiles: percentiles used to summarize `x`. E.g. [10,50,90].
charge_index: index of the charge column in the padded tensor
location: Location to insert the summarization indices in the
clustered tensor defaults to adding at the end
Adds:
_charge_sum: Added to the class
_charge_weights: Added to the class
Altered:
_padded_x: Charge is altered to be the cumulative sum
of the charge divided by the total charge
clustered_x: The summarization indices are added at the end
of the tensor or inserted at the specified location.
_cluster_names: The names are added at the end of the tensor
or inserted at the specified location
"""
# convert the charge to the cumulative sum of the charge divided
# by the total charge
self._calculate_charge_sum(charge_index)
self._calculate_charge_weights(charge_index)

self._padded_x[:, :, charge_index] = (
self._padded_x[:, :, charge_index]
/ self._charge_sum[:, np.newaxis]
)

# Summarize the charge at different percentiles
selections = np.argmax(
self._padded_x[:, :, charge_index][:, :, np.newaxis]
>= (np.array(percentiles) / 100),
axis=1,
)

selections += (np.arange(len(self._counts)) * self._padded_x.shape[1])[
:, np.newaxis
]

selections = self._padded_x[:, :, summarization_indices].reshape(
-1, len(summarization_indices)
)[selections]
selections = selections.transpose(0, 2, 1).reshape(
len(self.clustered_x), -1
)
self._add_column(selections, location)

# update the cluster names
if self._input_names is not None:
new_names = [
self._input_names[i] + "_charge_threshold_" + str(p)
for i in summarization_indices
for p in percentiles
]
self._add_column_names(new_names, location)

def add_percentile_summary(
self,
summarization_indices: List[int],
percentiles: List[int],
method: str = "linear",
location: Optional[int] = None,
) -> np.ndarray:
"""Summarize the features of the sensors using percentiles.

Args:
summarization_indices: List of column indices that defines features
that will be summarized with percentiles.
percentiles: percentiles used to summarize `x`. E.g. [10,50,90].
method: Method to summarize the features. E.g. "linear"
location: Location to insert the summarization indices in the
clustered tensor defaults to adding at the end
Altered:
clustered_x: The summarization indices are added at the end of
the tensor or inserted at the specified location
_cluster_names: The names are added at the end of the tensor
or inserted at the specified location
"""
percentiles_x = np.nanpercentile(
self._padded_x[:, :, summarization_indices],
percentiles,
axis=1,
method=method,
)

percentiles_x = percentiles_x.transpose(1, 2, 0).reshape(
len(self.clustered_x), -1
)
self._add_column(percentiles_x, location)

# update the cluster names
if self._input_names is not None:
new_names = [
self._input_names[i] + "_percentile_" + str(p)
for i in summarization_indices
for p in percentiles
]
self._add_column_names(new_names, location)

def add_counts(self, location: Optional[int] = None) -> np.ndarray:
"""Add the counts of the sensor to the summarization features."""
self._add_column(np.log10(self._counts), location)
if self._input_names is not None:
new_name = ["counts"]
self._add_column_names(new_name, location)

def add_sum_charge(
self, charge_index: int, location: Optional[int] = None
) -> np.ndarray:
"""Add the sum of the charge to the summarization features."""
if not hasattr(self, "_charge_sum"):
self._calculate_charge_sum(charge_index)
self._add_column(self._charge_sum, location)
# update the cluster names
if self._input_names is not None:
new_name = [self._input_names[charge_index] + "_sum"]
self._add_column_names(new_name, location)

def add_std(
self,
columns: List[int],
location: Optional[int] = None,
weights: Union[np.ndarray, int] = 1,
) -> np.ndarray:
"""Add the standard deviation of the column.

Args:
columns: Index of the columns from which to calculate the standard
deviation.
location: Location to insert the standard deviation in the
clustered tensor defaults to adding at the end
weights: Optional weights to be applied to the standard deviation
"""
self._add_column(
np.nanstd(self._padded_x[:, :, columns] * weights, axis=1),
location,
)
if self._input_names is not None:
new_names = [self._input_names[i] + "_std" for i in columns]
self._add_column_names(new_names, location)

def add_mean(
self,
columns: List[int],
location: Optional[int] = None,
weights: Union[np.ndarray, int] = 1,
) -> np.ndarray:
"""Add the mean of the column."""
self._add_column(
np.nanmean(self._padded_x[:, :, columns] * weights, axis=1),
location,
)
# update the cluster names
if self._input_names is not None:
new_names = [self._input_names[i] + "_mean" for i in columns]
self._add_column_names(new_names, location)


def ice_transparency(
z_offset: Optional[float] = None, z_scaling: Optional[float] = None
) -> Tuple[interp1d, interp1d]:
Expand Down
Loading