From 641e4c6c274c0af0644cd5838997f8afd3a5fdad Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 15 Dec 2023 23:10:35 +0100 Subject: [PATCH 01/22] add spline funcs --- src/graphnet/models/flows/__init__.py | 1 + src/graphnet/models/flows/spline_blocks.py | 391 +++++++++++++++++++++ 2 files changed, 392 insertions(+) create mode 100644 src/graphnet/models/flows/__init__.py create mode 100644 src/graphnet/models/flows/spline_blocks.py diff --git a/src/graphnet/models/flows/__init__.py b/src/graphnet/models/flows/__init__.py new file mode 100644 index 000000000..72ade601a --- /dev/null +++ b/src/graphnet/models/flows/__init__.py @@ -0,0 +1 @@ +"""Module for Normalizing Flows in GraphNeT.""" diff --git a/src/graphnet/models/flows/spline_blocks.py b/src/graphnet/models/flows/spline_blocks.py new file mode 100644 index 000000000..041421cbd --- /dev/null +++ b/src/graphnet/models/flows/spline_blocks.py @@ -0,0 +1,391 @@ +"""Utility classes for SplineBlocks for NormalizingFlows in GraphNeT.""" + +from typing import Tuple, List +from abc import abstractmethod +import torch + +from graphnet.models import Model + + +class RationalQuadraticSpline(Model): + """Implementation of https://arxiv.org/pdf/1906.04032.pdf, page 4.""" + + def __init__(self, n_knots: int, b: int): + """Construct `RationalQuadraticSpline`. + + Args: + n_knots: Number of knots per spline. + b: Bounding parameter. Input is assumed to be in [-b,b]. + """ + super(RationalQuadraticSpline, self).__init__() + self.n_knots = n_knots + self.b = b + self._softmax = torch.nn.Softmax(dim=1) + self._softplus = torch.nn.Softplus() + self._eps = 0 + + def forward_spline( + self, + x: torch.Tensor, + spline_parameters: torch.Tensor, + ) -> torch.Tensor: + """Forward spline call. + + Transform a dimension `x` from input distribution into dimension `y` in + latent distribution. + """ + # Checks + assert ~torch.any( + ~(x.lt(self.b) & x.gt(-self.b)).bool() + ), f"At least one sample in `x` is out of bounds of [{-self.b}, {self.b}]: {torch.max(x, dim =1)[0]}" + + # Transform knot bins to coordinates and pad as described in paper + knot_x, knot_y, d = self._partition_spline_parameters( + spline_parameters=spline_parameters + ) + # Calculate knot index `k`, s and epsilon. + k = self._find_spline_idx(knot_x, x) + s = self._calculate_s(knot_y, knot_x, k) + epsilon = self._calculate_epsilon(knot_x, x, k) + + # Evaluate Spline & get Jacobian + y = self._evaluate_spline(s, epsilon, knot_y, d, k) + jacobian = self._calculate_jacobian(s, d, k, epsilon) + return y.reshape(-1, 1).squeeze(1), jacobian.squeeze(1) + + def inverse_spline( + self, + y: torch.Tensor, + spline_parameters: torch.Tensor, + ) -> torch.Tensor: + """Inverse spline call. + + Transform of dimension `y` from internal distribution into dimension + `x` from input distribution. + """ + # Checks + assert ~torch.any( + ~(y.lt(self.b) & y.gt(-self.b)).bool() + ), f"At least one sample in `x` is out of bounds of [{-self.b}, {self.b}]." + + # Transform knot bins to coordinates and pad as described in paper + knot_x, knot_y, d = self._partition_spline_parameters( + spline_parameters=spline_parameters + ) + + # Calculate knot index `k` and s + k = self._find_spline_idx(knot_y, y) + assert ( + max(k) + 1 <= knot_y.shape[1] + ), f"""{knot_y.shape} vs. {max(k) + 1}""" + s = self._calculate_s(knot_y, knot_x, k) + + # Calculate coefficients a, b and c from paper + a, b, c = self._calculate_abc(y=y, knot_y=knot_y, d=d, s=s, k=k) + x = (2 * c) / (-b - torch.sqrt(b**2 - 4 * a * c)) * ( + torch.gather(knot_x, 1, k + 1) - torch.gather(knot_x, 1, k) + ) + torch.gather(knot_x, 1, k) + return x.reshape(-1, 1).squeeze(1) + + def _calculate_abc( + self, + y: torch.Tensor, + knot_y: torch.Tensor, + d: torch.Tensor, + s: torch.Tensor, + k: torch.Tensor, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + + knot_y_k_1 = torch.gather(knot_y, 1, k + 1) + knot_y_k = torch.gather(knot_y, 1, k) + d_k = torch.gather(d, 1, k) + d_k_1 = torch.gather(d, 1, k + 1) + + a = (knot_y_k_1 - knot_y_k) * (s - d_k) + (y - knot_y_k) * ( + d_k_1 + d_k - 2 * s + ) + b = (knot_y_k_1 - knot_y_k) * d_k - (y - knot_y_k) * ( + d_k_1 + d_k - 2 * s + ) + c = -s * (y - knot_y_k) + return a, b, c + + def _partition_spline_parameters( + self, spline_parameters: torch.Tensor + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Convert neural network outputs into spline parameters.""" + # Identify spline parameters from neural network output + knot_x_bins = spline_parameters[:, 0 : self.n_knots] + knot_y_bins = spline_parameters[:, self.n_knots : (2 * self.n_knots)] + d = spline_parameters[:, (2 * self.n_knots) : (3 * self.n_knots) - 1] + + # Checks + assert ( + knot_x_bins.shape[1] + knot_y_bins.shape[1] + d.shape[1] + == spline_parameters.shape[1] + ) + + # Transform knot bins to coordinates and pad as described in paper + knot_x, knot_y, d = self._setup_coordinates( + knot_x_bins, knot_y_bins, d + ) + return knot_x, knot_y, d + + def _find_spline_idx( + self, knots_x: torch.Tensor, x: torch.Tensor + ) -> torch.Tensor: + """Identify which spline segment a given entry in x belongs to. + + Args: + knots_x: x-coordinate of knots. + x: The domain on which the spline is being evaluated. + + Returns: + The index of the segment of the spline that each entry in x belongs to. + """ + return ( + torch.searchsorted(knots_x.contiguous(), x.contiguous()) - 1 + ).contiguous() + + def _calculate_s( + self, knot_y: torch.Tensor, knot_x: torch.Tensor, k: torch.Tensor + ) -> torch.Tensor: + return ( + torch.gather(knot_y, 1, k + 1) - torch.gather(knot_y, 1, k) + ) / (torch.gather(knot_x, 1, k + 1) - torch.gather(knot_x, 1, k)) + + def _calculate_epsilon( + self, knot_x: torch.Tensor, x: torch.Tensor, k: torch.Tensor + ) -> torch.Tensor: + assert sum((x - torch.gather(knot_x, 1, k) < 0)) == 0 + return (x - torch.gather(knot_x, 1, k)) / ( + torch.gather(knot_x, 1, k + 1) - torch.gather(knot_x, 1, k) + ) + + def _calculate_jacobian( + self, + s: torch.Tensor, + d: torch.Tensor, + k: torch.Tensor, + epsilon: torch.Tensor, + ) -> torch.Tensor: + """Evaluate Eq 5 in https://arxiv.org/pdf/1906.04032.pdf, page 4.""" + nominator = (s) ** 2 * ( + torch.gather(d, 1, k + 1) * epsilon**2 + + 2 * s * epsilon * (1 - epsilon) + + torch.gather(d, 1, k) * (1 - epsilon) ** 2 + ) + denominator = ( + s + + (torch.gather(d, 1, k + 1) + torch.gather(d, 1, k) - 2 * s) + * epsilon + * (1 - epsilon) + ) ** 2 + jac = nominator / denominator + return jac + + def _evaluate_spline( + self, + s: torch.Tensor, + epsilon: torch.Tensor, + knot_y: torch.Tensor, + d: torch.Tensor, + k: torch.Tensor, + ) -> torch.Tensor: + """Evaluate Eq 4 in https://arxiv.org/pdf/1906.04032.pdf, page 4.""" + numerator = ( + torch.gather(knot_y, 1, k + 1) - torch.gather(knot_y, 1, k) + ) * ( + s * epsilon * epsilon + + torch.gather(d, 1, k) * epsilon * (1 - epsilon) + ) + denominator = s + ( + torch.gather(d, 1, k + 1) + torch.gather(d, 1, k) - 2 * s + ) * epsilon * (1 - epsilon) + return torch.gather(knot_y, 1, k) + numerator / denominator + + def _transform_to_internal_coordinates( + self, x: torch.Tensor + ) -> torch.Tensor: + return self.b * (torch.cumsum(self._softmax(x), 1) * 2 - 1) + + def _setup_coordinates( + self, + knot_x_bins: torch.Tensor, + knot_y_bins: torch.Tensor, + d: torch.Tensor, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Convert NN outputs for knots bins to coordinates and pad.""" + knot_x = self._transform_to_internal_coordinates(knot_x_bins) + knot_y = self._transform_to_internal_coordinates(knot_y_bins) + knot_x = torch.nn.functional.pad(knot_x, (1, 0)) + knot_x[:, 0] = -self.b - self._eps + knot_x[:, -1] = knot_x[:, -1] + self._eps + + knot_y = torch.nn.functional.pad(knot_y, (1, 0)) + knot_y[:, 0] = -self.b - self._eps + knot_y[:, -1] = knot_y[:, -1] + self._eps + + d = self._softplus(d) + d = torch.nn.functional.pad(d, (1, 1), value=1.0) + return knot_x, knot_y, d + + +class SplineBlock(RationalQuadraticSpline): + """Generic SplineBlock class.""" + + def __init__(self, b: int, n_knots: int) -> None: + """Construct `SplineBlock`. + + Args: + b: Bounding parameter. Input is assumed to be in [-b,b]. + n_knots: Number of knots per spline. + """ + super(SplineBlock, self).__init__(b=b, n_knots=n_knots) + + @abstractmethod + def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: + """Logic of the forward partition. + + Must return y and Jacobian. + """ + + @abstractmethod + def inverse(self, y: torch.Tensor) -> torch.Tensor: + """Logic of the inverse partition. + + Must return x. + """ + + +class TwoPartitionSplineBlock(SplineBlock): + """TwoPartitionSplineBlock. + + A SplineBlock that partitions the input dimensions in two parts. + """ + + def __init__( + self, + b: int, + n_knots: int, + nn_0: torch.nn.Sequential, + nn_1: torch.nn.Sequential, + partition: Tuple[List[int], List[int]], + input_dim: int, + ): + """Construct `TwoPartitionSplineBlock`. + + Args: + b: Bounding parameter. Input dimensions are each assumed to be in + [-b,b] + n_knots: number of knots per spline. + nn_0: Neural network used to transform first partition. + nn_1: Neural network used to transform second partition. + partition: A two-partition partition. + input_dim: Number of input dimensions. + """ + super(TwoPartitionSplineBlock, self).__init__(b=b, n_knots=n_knots) + self._input_dim = input_dim + self.nn_0 = nn_0 + self.nn_1 = nn_1 + self.partition = partition + + def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: + """Forward transformation. + + Transform sample `x` from input distribution into `y` from latent + distribution. + """ + x_0 = x[:, self.partition[0]] + x_1 = x[:, self.partition[1]] + + spline_params_1 = self.nn_0(x_0) / 100 + # if torch.sum(spline_params_1.isnan()==True) > 1: + # self.error("NaN encountered in spline parameterization!") + # assert torch.sum(spline_params_1.isnan()==True) == 0 + y_1, jacobian_1 = self.apply_splines_to_each_dimension( + x=x_1, spline_parameters=spline_params_1 + ) + spline_params_0 = self.nn_1(y_1) / 100 + # if torch.sum(spline_params_0.isnan()==True) > 1: + # self.error("NaN encountered in spline parameterization!") + # assert torch.sum(spline_params_0.isnan()==True) == 0 + y_0, jacobian_0 = self.apply_splines_to_each_dimension( + x=x_0, spline_parameters=spline_params_0 + ) + jac = torch.cat([jacobian_0, jacobian_1], dim=1) + return torch.cat([y_0, y_1], dim=1), jac + + def inverse(self, y: torch.Tensor) -> torch.Tensor: + """Inverse transformation. + + Transform sample from latent distribution `y` into sample `x` from + input distribution. + """ + y_0 = y[:, self.partition[0]] + y_1 = y[:, self.partition[1]] + + spline_params_0 = self.nn_1(y_1) + x_0 = self.invert_splines_for_each_dimension( + y=y_0, spline_parameters=spline_params_0 + ) + + spline_params_1 = self.nn_0(x_0) + x_1 = self.invert_splines_for_each_dimension( + y=y_1, spline_parameters=spline_params_1 + ) + return torch.cat([x_0, x_1], dim=1) + + def apply_splines_to_each_dimension( + self, x: torch.Tensor, spline_parameters: torch.Tensor + ) -> Tuple[torch.Tensor, torch.Tensor]: + """Apply forward call from splines to each dimension in ´x´. + + Args: + x: sample to transform. + spline_parameters: Parameters for splines. + + Returns: + transformed sample `y` and associated jacobian. + """ + y_partition = torch.zeros(x.shape).to(x.device) + jacobian_partition = torch.zeros(x.shape).to(x.device) + for dim in range(x.shape[1]): + parameter_slice = spline_parameters[ + :, + ((3 * self.n_knots - 1) * dim) : (3 * self.n_knots - 1) + * (1 + dim), + ] + y_dim, jacobian_dim = self.forward_spline( + x=x[:, dim].reshape(-1, 1), spline_parameters=parameter_slice + ) + y_partition[:, dim] = y_dim + jacobian_partition[:, dim] = jacobian_dim + + return y_partition, jacobian_partition + + def invert_splines_for_each_dimension( + self, y: torch.Tensor, spline_parameters: torch.Tensor + ) -> torch.Tensor: + """Apply inverse call from splines to each dimension in ´y´. + + Args: + y: sample to transform. + spline_parameters: Parameters for splines. + + Returns: + transformed sample `x`. + """ + x_partition = torch.zeros(y.shape).to(y.device) + for dim in range(y.shape[1]): + parameter_slice = spline_parameters[ + :, + ((3 * self.n_knots - 1) * dim) : (3 * self.n_knots - 1) + * (1 + dim), + ] + x_dim = self.inverse_spline( + y=y[:, dim].reshape(-1, 1), spline_parameters=parameter_slice + ) + x_partition[:, dim] = x_dim + + return x_partition From 117c799204b18e9e3c36c8e15045d3639c4aab12 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 15 Dec 2023 23:13:58 +0100 Subject: [PATCH 02/22] add NormalizingFlow --- src/graphnet/models/flows/__init__.py | 1 + src/graphnet/models/flows/normalizing_flow.py | 39 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 src/graphnet/models/flows/normalizing_flow.py diff --git a/src/graphnet/models/flows/__init__.py b/src/graphnet/models/flows/__init__.py index 72ade601a..bdee644d6 100644 --- a/src/graphnet/models/flows/__init__.py +++ b/src/graphnet/models/flows/__init__.py @@ -1 +1,2 @@ """Module for Normalizing Flows in GraphNeT.""" +from .normalizing_flow import NormalizingFlow diff --git a/src/graphnet/models/flows/normalizing_flow.py b/src/graphnet/models/flows/normalizing_flow.py new file mode 100644 index 000000000..0efbb5601 --- /dev/null +++ b/src/graphnet/models/flows/normalizing_flow.py @@ -0,0 +1,39 @@ +"""Base GNN-specific `Model` class(es).""" + +from abc import abstractmethod + +from torch import Tensor +from torch_geometric.data import Data + +from graphnet.models import Model + + +class NormalizingFlow(Model): + """Base class for all core Normalizing Flow models in GraphNeT.""" + + def __init__(self, nb_inputs: int): + """Construct `NormalizingFlow`.""" + # Base class constructor + super().__init__() + + # Member variables + self._nb_inputs = nb_inputs + self._nb_outputs = nb_inputs # Normalizing flows are bijective + + @property + def nb_inputs(self) -> int: + """Return number of input features.""" + return self._nb_inputs + + @property + def nb_outputs(self) -> int: + """Return number of output features.""" + return self._nb_outputs + + @abstractmethod + def forward(self, data: Data) -> Tensor: + """Transform from input distribution into latent distribution.""" + + @abstractmethod + def inverse(self, data: Data) -> Tensor: + """Transform from latent distribution to input distribution.""" From 807bbd568555af579da91c3920239525d5aac276 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 15 Dec 2023 23:27:09 +0100 Subject: [PATCH 03/22] add INGA --- src/graphnet/models/flows/inga.py | 133 ++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 src/graphnet/models/flows/inga.py diff --git a/src/graphnet/models/flows/inga.py b/src/graphnet/models/flows/inga.py new file mode 100644 index 000000000..b48a94d89 --- /dev/null +++ b/src/graphnet/models/flows/inga.py @@ -0,0 +1,133 @@ +"""Normalizing flow using parameterized splines. + +Implemented by Rasmus Ørsøe, 2023. +""" +from typing import List, Tuple + +import torch + +from graphnet.models.flows import NormalizingFlow +from graphnet.models.flows.spline_blocks import ( + SplineBlock, + TwoPartitionSplineBlock, +) +from torch_geometric.data import Data + + +class INGA(NormalizingFlow): + """Implementation of spline-based neural flows. + + Inspied by https://arxiv.org/pdf/1906.04032.pdf + """ + + def __init__( + self, + nb_inputs: int, + b: int = 100, + n_knots: int = 5, + num_blocks: int = 1, + partitions: List[Tuple[List[int], List[int]]] = None, + c: int = 1, + ): + """Construct INGA. + + Args: + nb_inputs: Number of input dimensions to be transformed. + b: The bounding parameter. + All input dimensions are assumed to be in the range [-b,b]. + Defaults to 100. + n_knots: Number of knots per spline. Defaults to 5. + num_blocks: Numbe of spline blocks. Defaults to 1. + partitions: A set of partitions that designate which dimensions of + the input are used to transform each other + E.g. [[0,1,2,3,4], [5,6,7,8,9]] (for 10-dimensional case) + means dimensions 0 through 4 is used to transform + dimensions 5 through 9 and vice-versa. + Defaults to None, which will create an even partition. + c: Scaling parameter for the neural network. + """ + super().__init__(nb_inputs) + + # Set Member variables + self.n_knots = n_knots + self.num_blocks = num_blocks + + if partitions is None: + partitions = self._create_default_partitions() + + self.partitions = partitions + + # checks + assert len(partitions) == self.num_blocks + + # constants + spline_blocks = [] + for k in range(num_blocks): + nn_0_dim = len(partitions[k][0]) + nn_1_dim = len(partitions[k][1]) + spline_blocks.append( + TwoPartitionSplineBlock( + b=b, + n_knots=n_knots, + input_dim=self.nb_inputs, + nn_0=torch.nn.Sequential( + torch.nn.Linear(nn_0_dim, nn_0_dim * c), + torch.nn.ReLU(), + torch.nn.Linear( + nn_0_dim * c, nn_1_dim * (n_knots * 3) + ), + ), # ((3*self.n_knots-1)*dim) + nn_1=torch.nn.Sequential( + torch.nn.Linear(nn_1_dim, nn_1_dim * c), + torch.nn.ReLU(), + torch.nn.Linear( + nn_1_dim * c, nn_0_dim * (n_knots * 3) + ), + ), + partition=partitions[k], + ) + ) + + self.spline_blocks = torch.nn.ModuleList(spline_blocks) + + def _create_default_partitions(self) -> List[Tuple[List[int], List[int]]]: + default_partition = ( + [i for i in range(0, int(self.nb_inputs / 2))], + [k for k in range(int(self.nb_inputs / 2), self.nb_inputs)], + ) + partitions = [] + for _ in range(self.num_blocks): + partitions.append(default_partition) + return partitions + + def forward(self, data: Data) -> Tuple[torch.Tensor, torch.Tensor]: + """Forward call. + + Will transform sample from input distribution to latent distribution. + """ + is_first = True + c = 0 + x = data.x + for spline_block in self.spline_blocks: + # self.info(f"spline block {c}") + if is_first: + y, partition_jacobian = spline_block(x=x) + global_jacobian = partition_jacobian + is_first = False + else: + y, partition_jacobian = spline_block(x=y) + global_jacobian *= partition_jacobian + c += 1 + return y, global_jacobian + + def inverse(self, y: torch.Tensor) -> torch.Tensor: + """Inverse call. + + Will transform sample from latent distribution to input distribution. + """ + reversed_index = list(range(0, len(self.spline_blocks)))[ + ::-1 + ] # 6, 5, 4 .. + for idx in reversed_index: + y = self.spline_blocks[idx].inverse(y=y) + return self.inverse_transform(y) From dc0d8f3ff018fa2e422deda3ce4e867a6d4b8e4e Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 15 Dec 2023 23:57:48 +0100 Subject: [PATCH 04/22] add example --- .../04_training/05_train_normalizing_flow.py | 162 ++++++++++++++++++ src/graphnet/models/flows/__init__.py | 1 + 2 files changed, 163 insertions(+) create mode 100644 examples/04_training/05_train_normalizing_flow.py diff --git a/examples/04_training/05_train_normalizing_flow.py b/examples/04_training/05_train_normalizing_flow.py new file mode 100644 index 000000000..0c463c540 --- /dev/null +++ b/examples/04_training/05_train_normalizing_flow.py @@ -0,0 +1,162 @@ +"""Example of training Model.""" + +import os +from typing import Any, Dict, List, Optional + +from pytorch_lightning.loggers import WandbLogger +import numpy as np +import pandas as pd + +from graphnet.constants import EXAMPLE_DATA_DIR, EXAMPLE_OUTPUT_DIR +from graphnet.data.constants import FEATURES, TRUTH +from graphnet.models import StandardModel +from graphnet.models.detector.prometheus import Prometheus +from graphnet.models.flows import INGA +from graphnet.models.graphs import GraphDefinition +from graphnet.models.graphs.nodes import NodesAsPulses + +from graphnet.models.task import StandardFlowTask +from graphnet.training.loss_functions import ( # type : ignore + LogCoshLoss, # MultivariateGaussianFlowLoss, # type : ignore +) # type : ignore +from graphnet.training.utils import make_train_validation_dataloader +from graphnet.utilities.argparse import ArgumentParser +from graphnet.utilities.logging import Logger + +# Constants +features = FEATURES.PROMETHEUS +truth = TRUTH.PROMETHEUS + + +def main( + path: str, + pulsemap: str, + target: str, + truth_table: str, + gpus: Optional[List[int]], + max_epochs: int, + early_stopping_patience: int, + batch_size: int, + num_workers: int, +) -> None: + """Run example.""" + # Construct Logger + logger = Logger() + # Configuration + config: Dict[str, Any] = { + "path": path, + "pulsemap": pulsemap, + "batch_size": batch_size, + "num_workers": num_workers, + "target": target, + "early_stopping_patience": early_stopping_patience, + "fit": { + "gpus": gpus, + "max_epochs": max_epochs, + }, + } + + archive = os.path.join(EXAMPLE_OUTPUT_DIR, "train_model_without_configs") + run_name = "INGA_example_1mio" + + # Define graph representation + detector = Prometheus() + + graph_definition = GraphDefinition( + detector=detector, + node_definition=NodesAsPulses(), + input_feature_names=input_feature_names, + ) + ( + training_dataloader, + validation_dataloader, + ) = make_train_validation_dataloader( + db=config["path"], + graph_definition=graph_definition, + pulsemaps=config["pulsemap"], + features=features, + truth=truth, + batch_size=config["batch_size"], + num_workers=config["num_workers"], + truth_table=truth_table, + selection=None, + ) + + # Building model + flow = INGA( + nb_inputs=graph_definition.nb_outputs, + n_knots=120, + num_blocks=4, + b=100, + c=100, + ) + task = StandardFlowTask( + target_labels=graph_definition.output_feature_names, + prediction_labels=graph_definition.output_feature_names, + loss_function=LogCoshLoss, # MultivariateGaussianFlowLoss(), + ) + model = StandardModel( + graph_definition=graph_definition, + backbone=flow, + tasks=[task], + ) + + model.fit( + training_dataloader, + validation_dataloader, + **config["fit"], + ) + results = model.predict_as_dataframe( + validation_dataloader, + additional_attributes=["event_no"], + ) + + # Save predictions and model to file + db_name = path.split("/")[-1].split(".")[0] + path = os.path.join(archive, db_name, run_name) + logger.info(f"Writing results to {path}") + os.makedirs(path, exist_ok=True) + + # Save results as .csv + results.to_csv(f"{path}/results.csv") + + # Save full model (including weights) to .pth file - not version safe + # Note: Models saved as .pth files in one version of graphnet + # may not be compatible with a different version of graphnet. + model.save(f"{path}/model.pth") + + # Save model config and state dict - Version safe save method. + # This method of saving models is the safest way. + model.save_state_dict(f"{path}/state_dict.pth") + model.save_config(f"{path}/model_config.yml") + + +if __name__ == "__main__": + database = "/mnt/scratch/rasmus_orsoe/databases/dev_level2_oscNext_pulsenoise_full_v4_remerge_v2/data/dev_level2_oscNext_pulsenoise_full_v4_remerge_v2_part_1.db" + pulsemap = "SRTTWOfflinePulsesDC" + target = "" + truth_table = "truth" + gpus = [2] + max_epochs = 400 + early_stopping_patience = 16 + batch_size = 500 + num_workers = 30 + input_feature_names = ["dom_x", "dom_y", "dom_z", "dom_time"] + string_selection = [83.0, 84.0, 85.0, 86.0] + + string_mask = [] + for string in np.arange(0, 87): + if string not in string_selection: + string_mask.append(string) + + main( + path=database, + pulsemap=pulsemap, + target=target, + truth_table=truth_table, + gpus=gpus, + max_epochs=max_epochs, + early_stopping_patience=early_stopping_patience, + batch_size=batch_size, + num_workers=num_workers, + ) diff --git a/src/graphnet/models/flows/__init__.py b/src/graphnet/models/flows/__init__.py index bdee644d6..5541462af 100644 --- a/src/graphnet/models/flows/__init__.py +++ b/src/graphnet/models/flows/__init__.py @@ -1,2 +1,3 @@ """Module for Normalizing Flows in GraphNeT.""" from .normalizing_flow import NormalizingFlow +from .inga import INGA From 7d5e6de1bae54bcdceec69ab4cb988afcc8b4bcb Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 15 Dec 2023 23:59:30 +0100 Subject: [PATCH 05/22] add MultivariateGaussianFlowLoss --- src/graphnet/training/loss_functions.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/graphnet/training/loss_functions.py b/src/graphnet/training/loss_functions.py index 624a5fa53..bde04071e 100644 --- a/src/graphnet/training/loss_functions.py +++ b/src/graphnet/training/loss_functions.py @@ -443,3 +443,26 @@ def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: kappa = prediction[:, 3] p = kappa.unsqueeze(1) * prediction[:, [0, 1, 2]] return self._evaluate(p, target) + + +class MultivariateGaussianFlowLoss(LossFunction): + """Loss for transforming input distribution to Multivariate Gaussian. + + Intended for use by `NormalizingFlows.` + """ + + def __init__(self, **kwargs: Any) -> None: + """Construct `MultivariateGaussianFlowLoss`.""" + super().__init__(**kwargs) + self._const = torch.tensor(2 * torch.pi, dtype=torch.float) + + def _forward( # type: ignore + self, prediction: Tensor, jacobian: Tensor, target: Tensor + ) -> Tensor: + assert prediction.shape == jacobian.shape + loss = -torch.mean( + torch.log(jacobian) + - (torch.pow(prediction, 2) + torch.log(self._const)) / 2, + dim=1, + ) + return loss From 82fa7077f62696dc990084855d7ef270717568a3 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Sat, 16 Dec 2023 00:22:13 +0100 Subject: [PATCH 06/22] example plosh --- .../04_training/05_train_normalizing_flow.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/examples/04_training/05_train_normalizing_flow.py b/examples/04_training/05_train_normalizing_flow.py index 0c463c540..f0a3daab0 100644 --- a/examples/04_training/05_train_normalizing_flow.py +++ b/examples/04_training/05_train_normalizing_flow.py @@ -16,9 +16,9 @@ from graphnet.models.graphs.nodes import NodesAsPulses from graphnet.models.task import StandardFlowTask -from graphnet.training.loss_functions import ( # type : ignore - LogCoshLoss, # MultivariateGaussianFlowLoss, # type : ignore -) # type : ignore +from graphnet.training.loss_functions import ( + MultivariateGaussianFlowLoss, +) from graphnet.training.utils import make_train_validation_dataloader from graphnet.utilities.argparse import ArgumentParser from graphnet.utilities.logging import Logger @@ -65,7 +65,7 @@ def main( graph_definition = GraphDefinition( detector=detector, node_definition=NodesAsPulses(), - input_feature_names=input_feature_names, + input_feature_names=features, ) ( training_dataloader, @@ -93,7 +93,7 @@ def main( task = StandardFlowTask( target_labels=graph_definition.output_feature_names, prediction_labels=graph_definition.output_feature_names, - loss_function=LogCoshLoss, # MultivariateGaussianFlowLoss(), + loss_function=MultivariateGaussianFlowLoss(), ) model = StandardModel( graph_definition=graph_definition, @@ -132,16 +132,15 @@ def main( if __name__ == "__main__": - database = "/mnt/scratch/rasmus_orsoe/databases/dev_level2_oscNext_pulsenoise_full_v4_remerge_v2/data/dev_level2_oscNext_pulsenoise_full_v4_remerge_v2_part_1.db" - pulsemap = "SRTTWOfflinePulsesDC" + database = "/home/iwsatlas1/oersoe/github/graphnet/data/examples/sqlite/prometheus/prometheus-events.db" + pulsemap = "total" target = "" - truth_table = "truth" - gpus = [2] + truth_table = "mc_truth" + gpus = None max_epochs = 400 early_stopping_patience = 16 batch_size = 500 num_workers = 30 - input_feature_names = ["dom_x", "dom_y", "dom_z", "dom_time"] string_selection = [83.0, 84.0, 85.0, 86.0] string_mask = [] From 55bf1f49165e63cf4dd4be77f9f6f29dd660786c Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Sat, 16 Dec 2023 00:30:48 +0100 Subject: [PATCH 07/22] standard model checks --- src/graphnet/models/standard_model.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/graphnet/models/standard_model.py b/src/graphnet/models/standard_model.py index 663664996..f35f7bee6 100644 --- a/src/graphnet/models/standard_model.py +++ b/src/graphnet/models/standard_model.py @@ -17,8 +17,9 @@ from graphnet.training.callbacks import ProgressBar from graphnet.models.graphs import GraphDefinition from graphnet.models.gnn.gnn import GNN +from graphnet.models.flows import NormalizingFlow from graphnet.models.model import Model -from graphnet.models.task import StandardLearnedTask +from graphnet.models.task import StandardLearnedTask, StandardFlowTask class StandardModel(Model): @@ -46,10 +47,13 @@ def __init__( super().__init__(name=__name__, class_name=self.__class__.__name__) # Check(s) - if isinstance(tasks, StandardLearnedTask): + if isinstance(tasks, (StandardLearnedTask, StandardFlowTask)): tasks = [tasks] assert isinstance(tasks, (list, tuple)) - assert all(isinstance(task, StandardLearnedTask) for task in tasks) + assert all( + isinstance(task, (StandardLearnedTask, StandardFlowTask)) + for task in tasks + ) assert isinstance(graph_definition, GraphDefinition) # deprecation warnings @@ -64,7 +68,7 @@ def __init__( raise TypeError( "__init__() missing 1 required keyword-only argument: 'backbone'" ) - assert isinstance(backbone, GNN) + assert isinstance(backbone, (GNN, NormalizingFlow)) # Member variable(s) self._graph_definition = graph_definition From 3563ec954ce246a43960f85b6d17d06e08074c0f Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Sat, 16 Dec 2023 00:43:42 +0100 Subject: [PATCH 08/22] update forward in standardmodel --- src/graphnet/models/standard_model.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/graphnet/models/standard_model.py b/src/graphnet/models/standard_model.py index f35f7bee6..a5dcfd1c3 100644 --- a/src/graphnet/models/standard_model.py +++ b/src/graphnet/models/standard_model.py @@ -1,6 +1,6 @@ """Standard model class(es).""" from collections import OrderedDict -from typing import Any, Dict, List, Optional, Union, Type +from typing import Any, Dict, List, Optional, Union, Type, Tuple import numpy as np import torch @@ -238,18 +238,27 @@ def configure_optimizers(self) -> Dict[str, Any]: def forward( self, data: Union[Data, List[Data]] - ) -> List[Union[Tensor, Data]]: + ) -> Union[List[Union[Tensor, Data]], Tuple[Tensor, Tensor]]: """Forward pass, chaining model components.""" if isinstance(data, Data): data = [data] x_list = [] + jacobian_list = [] for d in data: - x = self.backbone(d) - x_list.append(x) + x = self._architecture(d) + if isinstance(self._architecture, NormalizingFlow): + x_list.append(x[0]) # the embedding + jacobian_list.append(x[1]) # the jacobian + else: + x_list.append(x) x = torch.cat(x_list, dim=0) - - preds = [task(x) for task in self._tasks] - return preds + if isinstance(self._architecture, NormalizingFlow): + jacobian = torch.cat(jacobian_list, dim=0) + preds = [task(x, jacobian) for task in self._tasks] + return preds, jacobian + else: + preds = [task(x) for task in self._tasks] + return preds def shared_step(self, batch: List[Data], batch_idx: int) -> Tensor: """Perform shared step. From 6402d0b46d9d744182c72c333e3d2a5a3d62d735 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Sat, 16 Dec 2023 00:45:37 +0100 Subject: [PATCH 09/22] backbone rename --- src/graphnet/models/standard_model.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/graphnet/models/standard_model.py b/src/graphnet/models/standard_model.py index a5dcfd1c3..11cd33775 100644 --- a/src/graphnet/models/standard_model.py +++ b/src/graphnet/models/standard_model.py @@ -245,14 +245,14 @@ def forward( x_list = [] jacobian_list = [] for d in data: - x = self._architecture(d) - if isinstance(self._architecture, NormalizingFlow): + x = self.backbone(d) + if isinstance(self.backbone, NormalizingFlow): x_list.append(x[0]) # the embedding jacobian_list.append(x[1]) # the jacobian else: x_list.append(x) x = torch.cat(x_list, dim=0) - if isinstance(self._architecture, NormalizingFlow): + if isinstance(self.backbone, NormalizingFlow): jacobian = torch.cat(jacobian_list, dim=0) preds = [task(x, jacobian) for task in self._tasks] return preds, jacobian From 10c6af630b4ebe542eae9d6943399689a33df24d Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Mon, 18 Dec 2023 15:19:41 +0100 Subject: [PATCH 10/22] polish --- src/graphnet/models/standard_model.py | 60 ++++++++++++++++--------- src/graphnet/models/task/task.py | 8 +++- src/graphnet/training/loss_functions.py | 58 +++++++++++++++++------- 3 files changed, 87 insertions(+), 39 deletions(-) diff --git a/src/graphnet/models/standard_model.py b/src/graphnet/models/standard_model.py index 11cd33775..fa7cf05f0 100644 --- a/src/graphnet/models/standard_model.py +++ b/src/graphnet/models/standard_model.py @@ -238,25 +238,30 @@ def configure_optimizers(self) -> Dict[str, Any]: def forward( self, data: Union[Data, List[Data]] - ) -> Union[List[Union[Tensor, Data]], Tuple[Tensor, Tensor]]: + ) -> Union[Tensor, Tuple[Tensor, Tensor]]: """Forward pass, chaining model components.""" if isinstance(data, Data): data = [data] - x_list = [] - jacobian_list = [] - for d in data: - x = self.backbone(d) - if isinstance(self.backbone, NormalizingFlow): - x_list.append(x[0]) # the embedding - jacobian_list.append(x[1]) # the jacobian - else: - x_list.append(x) - x = torch.cat(x_list, dim=0) + if isinstance(self.backbone, NormalizingFlow): - jacobian = torch.cat(jacobian_list, dim=0) - preds = [task(x, jacobian) for task in self._tasks] - return preds, jacobian - else: + x_list = [] + jacobian_list = [] + for d in data: + x, jacobian = self.backbone(d) + x_list.append(x) + jacobian_list.append(jacobian) + x = torch.cat(x_list, dim=0) + jac = torch.cat(jacobian_list, dim=0) + preds = [task(x, jac) for task in self._tasks] + return preds, jac + + elif isinstance(self.backbone, GNN): + x_list = [] + for d in data: + x = self.backbone(d) + x_list.append(x) + x = torch.cat(x_list, dim=0) + preds = [task(x) for task in self._tasks] return preds @@ -266,8 +271,14 @@ def shared_step(self, batch: List[Data], batch_idx: int) -> Tensor: Applies the forward pass and the following loss calculation, shared between the training and validation step. """ - preds = self(batch) - loss = self.compute_loss(preds, batch) + if isinstance(self.backbone, NormalizingFlow): + preds, jacobian = self(batch) + loss = self.compute_loss( + preds=preds, data=batch, kwargs={"jacobian": jacobian} + ) + elif isinstance(self.backbone, GNN): + preds = self(batch) + loss = self.compute_loss(preds=preds, data=batch) return loss def training_step( @@ -307,7 +318,11 @@ def validation_step( return loss def compute_loss( - self, preds: Tensor, data: List[Data], verbose: bool = False + self, + preds: Union[List[Tensor], List[List[Tensor]]], + data: List[Data], + verbose: bool = False, + **kwargs: Any, ) -> Tensor: """Compute and sum losses across tasks.""" data_merged = {} @@ -320,10 +335,11 @@ def compute_loss( [d[task._loss_weight] for d in data], dim=0 ) - losses = [ - task.compute_loss(pred, data_merged) - for task, pred in zip(self._tasks, preds) - ] + losses = [] + for task in self._tasks: + losses.append( + task.compute_loss(predictions=preds, data=data, **kwargs) + ) if verbose: self.info(f"{losses}") assert all( diff --git a/src/graphnet/models/task/task.py b/src/graphnet/models/task/task.py index cd750f35d..2534600bc 100644 --- a/src/graphnet/models/task/task.py +++ b/src/graphnet/models/task/task.py @@ -310,7 +310,9 @@ def _forward(self, x: Union[Tensor, Data]) -> Union[Tensor, Data]: """Syntax like `.forward`, for implentation in inheriting classes.""" @final - def compute_loss(self, pred: Union[Tensor, Data], data: Data) -> Tensor: + def compute_loss( + self, prediction: Union[Tensor, Data], data: Data + ) -> Tensor: """Compute supervised learning loss. Grabs truth labels in `data` and sends both `pred` and `target` to loss @@ -325,7 +327,9 @@ def compute_loss(self, pred: Union[Tensor, Data], data: Data) -> Tensor: else: weights = None loss = ( - self._loss_function(pred, target, weights=weights) + self._loss_function( + prediction=prediction, target=target, weights=weights + ) + self._regularisation_loss ) return loss diff --git a/src/graphnet/training/loss_functions.py b/src/graphnet/training/loss_functions.py index bde04071e..7a26f7b8f 100644 --- a/src/graphnet/training/loss_functions.py +++ b/src/graphnet/training/loss_functions.py @@ -6,6 +6,7 @@ from abc import abstractmethod from typing import Any, Optional, Union, List, Dict +import inspect import numpy as np import scipy.special @@ -37,6 +38,7 @@ def forward( # type: ignore[override] target: Tensor, weights: Optional[Tensor] = None, return_elements: bool = False, + jacobian: Tensor = None, ) -> Tensor: """Forward pass for all loss functions. @@ -45,12 +47,24 @@ def forward( # type: ignore[override] target: Tensor containing targets. Shape [N,T] return_elements: Whether elementwise loss terms should be returned. The alternative is to return the averaged loss across examples. + jacobian: Jacobian from `NormalizingFlows` Returns: Loss, either averaged to a scalar (if `return_elements = False`) or elementwise terms with shape [N,] (if `return_elements = True`). """ - elements = self._forward(prediction, target) + # Toggle between LossFunction Types + if isinstance(self, FlowLossFunction): + elements = self._forward( + prediction=prediction, target=target, jacobian=jacobian + ) + elif isinstance(self, StandardLossFunction): + elements = self._forward(prediction=prediction, target=target) + else: + raise NotImplementedError( + f"{self.__class__.__name__} is not supported by `StandardModel`" + ) + if weights is not None: elements = elements * weights assert elements.size(dim=0) == target.size( @@ -59,12 +73,30 @@ def forward( # type: ignore[override] return elements if return_elements else torch.mean(elements) + +class StandardLossFunction(LossFunction): + """Standard loss function stucture for supervised learning. + + _forward call recieves prediction and target and computes a loss. + """ + @abstractmethod def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: """Syntax like `.forward`, for implentation in inheriting classes.""" -class MSELoss(LossFunction): +class FlowLossFunction(LossFunction): + """Loss function stucture for `NormalizingFlow`. + + _forward call recieves prediction and jacobian and constructs a loss. + """ + + @abstractmethod + def _forward(self, prediction: Tensor, jacobian: Tensor) -> Tensor: + """Syntax like `.forward`, for implentation in inheriting classes.""" + + +class MSELoss(StandardLossFunction): """Mean squared error loss.""" def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: @@ -88,7 +120,7 @@ def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: return elements -class LogCoshLoss(LossFunction): +class LogCoshLoss(StandardLossFunction): """Log-cosh loss function. Acts like x^2 for small x; and like |x| for large x. @@ -110,7 +142,7 @@ def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: return elements -class CrossEntropyLoss(LossFunction): +class CrossEntropyLoss(StandardLossFunction): """Compute cross-entropy loss for classification tasks. Predictions are an [N, num_class]-matrix of logits (i.e., non-softmax'ed @@ -193,7 +225,7 @@ def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: return self._loss(prediction.float(), target_one_hot.float()) -class BinaryCrossEntropyLoss(LossFunction): +class BinaryCrossEntropyLoss(StandardLossFunction): """Compute binary cross entropy loss. Predictions are vector probabilities (i.e., values between 0 and 1), and @@ -276,7 +308,7 @@ def backward( ) -class VonMisesFisherLoss(LossFunction): +class VonMisesFisherLoss(StandardLossFunction): """General class for calculating von Mises-Fisher loss. Requires implementation for specific dimension `m` in which the target and @@ -399,7 +431,7 @@ def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: return self._evaluate(p, t) -class EuclideanDistanceLoss(LossFunction): +class EuclideanDistanceLoss(StandardLossFunction): """Mean squared error in three dimensions.""" def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: @@ -445,20 +477,16 @@ def _forward(self, prediction: Tensor, target: Tensor) -> Tensor: return self._evaluate(p, target) -class MultivariateGaussianFlowLoss(LossFunction): +class MultivariateGaussianFlowLoss(FlowLossFunction): """Loss for transforming input distribution to Multivariate Gaussian. Intended for use by `NormalizingFlows.` """ - def __init__(self, **kwargs: Any) -> None: - """Construct `MultivariateGaussianFlowLoss`.""" - super().__init__(**kwargs) - self._const = torch.tensor(2 * torch.pi, dtype=torch.float) + # Class variable + _const = torch.tensor(2 * torch.pi, dtype=torch.float) - def _forward( # type: ignore - self, prediction: Tensor, jacobian: Tensor, target: Tensor - ) -> Tensor: + def _forward(self, prediction: Tensor, jacobian: Tensor) -> Tensor: assert prediction.shape == jacobian.shape loss = -torch.mean( torch.log(jacobian) From 9c432552d3d9aacca2e8fa784c1e1e17a35ab333 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Tue, 19 Dec 2023 11:50:53 +0100 Subject: [PATCH 11/22] edit checks in loss_function --- src/graphnet/models/standard_model.py | 18 ++++++++++-------- src/graphnet/models/task/task.py | 4 ++-- src/graphnet/training/loss_functions.py | 20 ++++++++++++-------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/graphnet/models/standard_model.py b/src/graphnet/models/standard_model.py index fa7cf05f0..83ebf3106 100644 --- a/src/graphnet/models/standard_model.py +++ b/src/graphnet/models/standard_model.py @@ -272,13 +272,13 @@ def shared_step(self, batch: List[Data], batch_idx: int) -> Tensor: between the training and validation step. """ if isinstance(self.backbone, NormalizingFlow): - preds, jacobian = self(batch) + predictions, jacobian = self(batch) loss = self.compute_loss( - preds=preds, data=batch, kwargs={"jacobian": jacobian} + predictions=predictions, data=batch, jacobian=jacobian ) elif isinstance(self.backbone, GNN): - preds = self(batch) - loss = self.compute_loss(preds=preds, data=batch) + predictions = self(batch) + loss = self.compute_loss(predictions=predictions, data=batch) return loss def training_step( @@ -319,10 +319,10 @@ def validation_step( def compute_loss( self, - preds: Union[List[Tensor], List[List[Tensor]]], + predictions: Union[List[Tensor], List[List[Tensor]]], data: List[Data], verbose: bool = False, - **kwargs: Any, + jacobian: Optional[Tensor] = None, ) -> Tensor: """Compute and sum losses across tasks.""" data_merged = {} @@ -336,9 +336,11 @@ def compute_loss( ) losses = [] - for task in self._tasks: + for i, task in enumerate(self._tasks): losses.append( - task.compute_loss(predictions=preds, data=data, **kwargs) + task.compute_loss( + predictions=predictions[i], data=data, jacobian=jacobian + ) ) if verbose: self.info(f"{losses}") diff --git a/src/graphnet/models/task/task.py b/src/graphnet/models/task/task.py index 2534600bc..487971970 100644 --- a/src/graphnet/models/task/task.py +++ b/src/graphnet/models/task/task.py @@ -421,7 +421,7 @@ def forward( @final def compute_loss( - self, prediction: Tensor, jacobian: Tensor, data: Data + self, predictions: Tensor, jacobian: Tensor, data: Data ) -> Tensor: """Compute loss for normalizing flow tasks. @@ -439,7 +439,7 @@ def compute_loss( weights = None loss = ( self._loss_function( - prediction=prediction, + predictions=predictions, jacobian=jacobian, weights=weights, target=None, diff --git a/src/graphnet/training/loss_functions.py b/src/graphnet/training/loss_functions.py index 7a26f7b8f..55fb61baa 100644 --- a/src/graphnet/training/loss_functions.py +++ b/src/graphnet/training/loss_functions.py @@ -34,7 +34,7 @@ def __init__(self, **kwargs: Any) -> None: @final def forward( # type: ignore[override] self, - prediction: Tensor, + predictions: Tensor, target: Tensor, weights: Optional[Tensor] = None, return_elements: bool = False, @@ -55,11 +55,9 @@ def forward( # type: ignore[override] """ # Toggle between LossFunction Types if isinstance(self, FlowLossFunction): - elements = self._forward( - prediction=prediction, target=target, jacobian=jacobian - ) + elements = self._forward(prediction=predictions, jacobian=jacobian) elif isinstance(self, StandardLossFunction): - elements = self._forward(prediction=prediction, target=target) + elements = self._forward(prediction=predictions, target=target) else: raise NotImplementedError( f"{self.__class__.__name__} is not supported by `StandardModel`" @@ -67,10 +65,16 @@ def forward( # type: ignore[override] if weights is not None: elements = elements * weights - assert elements.size(dim=0) == target.size( - dim=0 - ), "`_forward` should return elementwise loss terms." + # checks + if target is not None: + assert elements.size(dim=0) == target.size( + dim=0 + ), "`_forward` should return elementwise loss terms." + elif jacobian is not None: + assert elements.size(dim=0) == jacobian.size( + dim=0 + ), "`_forward` should return elementwise loss terms." return elements if return_elements else torch.mean(elements) From 1083f13b7f3cac43a46c45cd6ecaf82607259037 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Wed, 20 Dec 2023 15:04:36 +0100 Subject: [PATCH 12/22] Make output of NormalizingFlow a single tensor --- .../04_training/05_train_normalizing_flow.py | 11 ++-- src/graphnet/models/flows/inga.py | 5 +- src/graphnet/models/flows/normalizing_flow.py | 27 +++++++++ src/graphnet/models/standard_model.py | 49 +++++----------- src/graphnet/models/task/task.py | 58 +++++++++++++------ 5 files changed, 93 insertions(+), 57 deletions(-) diff --git a/examples/04_training/05_train_normalizing_flow.py b/examples/04_training/05_train_normalizing_flow.py index f0a3daab0..116fe0b17 100644 --- a/examples/04_training/05_train_normalizing_flow.py +++ b/examples/04_training/05_train_normalizing_flow.py @@ -92,8 +92,9 @@ def main( ) task = StandardFlowTask( target_labels=graph_definition.output_feature_names, - prediction_labels=graph_definition.output_feature_names, loss_function=MultivariateGaussianFlowLoss(), + coordinate_columns=flow.coordinate_columns, + jacobian_columns=flow.jacobian_columns, ) model = StandardModel( graph_definition=graph_definition, @@ -137,10 +138,10 @@ def main( target = "" truth_table = "mc_truth" gpus = None - max_epochs = 400 - early_stopping_patience = 16 - batch_size = 500 - num_workers = 30 + max_epochs = 100 + early_stopping_patience = 5 + batch_size = 16 + num_workers = 1 string_selection = [83.0, 84.0, 85.0, 86.0] string_mask = [] diff --git a/src/graphnet/models/flows/inga.py b/src/graphnet/models/flows/inga.py index b48a94d89..a1082a123 100644 --- a/src/graphnet/models/flows/inga.py +++ b/src/graphnet/models/flows/inga.py @@ -4,6 +4,7 @@ """ from typing import List, Tuple +import numpy as np import torch from graphnet.models.flows import NormalizingFlow @@ -46,6 +47,8 @@ def __init__( Defaults to None, which will create an even partition. c: Scaling parameter for the neural network. """ + self._coordinate_columns = np.arange(0, nb_inputs).tolist() + self._jacobian_columns = np.arange(nb_inputs, 2 * nb_inputs).tolist() super().__init__(nb_inputs) # Set Member variables @@ -118,7 +121,7 @@ def forward(self, data: Data) -> Tuple[torch.Tensor, torch.Tensor]: y, partition_jacobian = spline_block(x=y) global_jacobian *= partition_jacobian c += 1 - return y, global_jacobian + return torch.concat([y, global_jacobian], dim=1) def inverse(self, y: torch.Tensor) -> torch.Tensor: """Inverse call. diff --git a/src/graphnet/models/flows/normalizing_flow.py b/src/graphnet/models/flows/normalizing_flow.py index 0efbb5601..8067b81ee 100644 --- a/src/graphnet/models/flows/normalizing_flow.py +++ b/src/graphnet/models/flows/normalizing_flow.py @@ -1,6 +1,7 @@ """Base GNN-specific `Model` class(es).""" from abc import abstractmethod +from typing import List from torch import Tensor from torch_geometric.data import Data @@ -30,6 +31,32 @@ def nb_outputs(self) -> int: """Return number of output features.""" return self._nb_outputs + @property + def coordinate_columns(self) -> List[int]: + """Return the coordinate column indices. + + `NormalizingFlow` return a tensor of shape + [n_samples, n_coordinate_columns + jacobian_columns]. + + The coordinate column indices are used to slice the tensor `x` from + the NormalizingFlow.forward() such that x[:,coordinate_columns] + returns a Tensor containing only the coordinates. + """ + return self._coordinate_columns + + @property + def jacobian_columns(self) -> List[int]: + """Return the coordinate column indices. + + `NormalizingFlow` return a tensor of shape + [n_samples, n_coordinate_columns + jacobian_columns]. + + The jacobian column indices are used to slice the tensor `x` from + the NormalizingFlow.forward() such that x[:,jacobian_columns] + returns a Tensor containing only the Jacobian. + """ + return self._jacobian_columns + @abstractmethod def forward(self, data: Data) -> Tensor: """Transform from input distribution into latent distribution.""" diff --git a/src/graphnet/models/standard_model.py b/src/graphnet/models/standard_model.py index 83ebf3106..4fbfc1a59 100644 --- a/src/graphnet/models/standard_model.py +++ b/src/graphnet/models/standard_model.py @@ -243,27 +243,14 @@ def forward( if isinstance(data, Data): data = [data] - if isinstance(self.backbone, NormalizingFlow): - x_list = [] - jacobian_list = [] - for d in data: - x, jacobian = self.backbone(d) - x_list.append(x) - jacobian_list.append(jacobian) - x = torch.cat(x_list, dim=0) - jac = torch.cat(jacobian_list, dim=0) - preds = [task(x, jac) for task in self._tasks] - return preds, jac - - elif isinstance(self.backbone, GNN): - x_list = [] - for d in data: - x = self.backbone(d) - x_list.append(x) - x = torch.cat(x_list, dim=0) - - preds = [task(x) for task in self._tasks] - return preds + x_list = [] + for d in data: + x = self.backbone(d) + x_list.append(x) + x = torch.cat(x_list, dim=0) + + preds = [task(x) for task in self._tasks] + return preds def shared_step(self, batch: List[Data], batch_idx: int) -> Tensor: """Perform shared step. @@ -271,14 +258,8 @@ def shared_step(self, batch: List[Data], batch_idx: int) -> Tensor: Applies the forward pass and the following loss calculation, shared between the training and validation step. """ - if isinstance(self.backbone, NormalizingFlow): - predictions, jacobian = self(batch) - loss = self.compute_loss( - predictions=predictions, data=batch, jacobian=jacobian - ) - elif isinstance(self.backbone, GNN): - predictions = self(batch) - loss = self.compute_loss(predictions=predictions, data=batch) + predictions = self(batch) + loss = self.compute_loss(predictions=predictions, data=batch) return loss def training_step( @@ -322,9 +303,9 @@ def compute_loss( predictions: Union[List[Tensor], List[List[Tensor]]], data: List[Data], verbose: bool = False, - jacobian: Optional[Tensor] = None, ) -> Tensor: """Compute and sum losses across tasks.""" + # Prepare a common set of truth variables passed to all Tasks data_merged = {} target_labels_merged = list(set(self.target_labels)) for label in target_labels_merged: @@ -335,13 +316,13 @@ def compute_loss( [d[task._loss_weight] for d in data], dim=0 ) + # Loop over Tasks and calculate loss for each losses = [] for i, task in enumerate(self._tasks): - losses.append( - task.compute_loss( - predictions=predictions[i], data=data, jacobian=jacobian - ) + task_loss = task.compute_loss( + predictions=predictions[i], data=data_merged ) + losses.append(task_loss) if verbose: self.info(f"{losses}") assert all( diff --git a/src/graphnet/models/task/task.py b/src/graphnet/models/task/task.py index 487971970..7bdee55f3 100644 --- a/src/graphnet/models/task/task.py +++ b/src/graphnet/models/task/task.py @@ -252,7 +252,9 @@ def _forward( # type: ignore raise NotImplementedError @abstractmethod - def compute_loss(self, pred: Union[Tensor, Data], data: Data) -> Tensor: + def compute_loss( + self, predictions: Union[Tensor, Data], data: Data + ) -> Tensor: """Compute loss of `pred` wrt. target labels in `data`. @@ -311,7 +313,7 @@ def _forward(self, x: Union[Tensor, Data]) -> Union[Tensor, Data]: @final def compute_loss( - self, prediction: Union[Tensor, Data], data: Data + self, predictions: Union[Tensor, Data], data: Data ) -> Tensor: """Compute supervised learning loss. @@ -328,7 +330,7 @@ def compute_loss( weights = None loss = ( self._loss_function( - prediction=prediction, target=target, weights=weights + predictions=predictions, target=target, weights=weights ) + self._regularisation_loss ) @@ -389,16 +391,28 @@ class StandardFlowTask(Task): def __init__( self, target_labels: List[str], + coordinate_columns: List[int], + jacobian_columns: List[int], **task_kwargs: Any, ): - """Construct `StandardLearnedTask`. + """Construct `StandardFlowTask`. Args: target_labels: A list of names for the targets of this Task. hidden_size: The number of columns in the output of the last latent layer of `Model` using this Task. Available through `Model.nb_outputs` + coordinate_columns: Indices for columns in input tensor `x` that + represents coordinates in internal, latent + distribution. + jacobian_columns: Indices for columns in input tensor `x` that + represents jacobian. """ + self._default_prediction_labels = self._make_prediction_labels( + target_labels + ) + self._coordinate_columns = coordinate_columns + self._jacobian_columns = jacobian_columns # Base class constructor super().__init__(target_labels=target_labels, **task_kwargs) @@ -406,28 +420,35 @@ def nb_inputs(self) -> int: """Return number of inputs assumed by task.""" return len(self._target_labels) - def _forward(self, x: Tensor, jacobian: Tensor) -> Tensor: # type: ignore - # Leave it as is. - return x + @abstractmethod + def _forward(self, x: Tensor, jacobian: Tensor) -> Tensor: + # do nothing + return torch.cat([x, jacobian], dim=1) @final - def forward( - self, x: Union[Tensor, Data], jacobian: Optional[Tensor] - ) -> Union[Tensor, Data]: + def forward(self, x: Union[Tensor, Data]) -> Union[Tensor, Data]: """Forward pass.""" self._regularisation_loss = 0 # Reset - x = self._forward(x, jacobian) - return self._transform_prediction(x) + y = self._forward( + x=x[:, self._coordinate_columns], + jacobian=x[:, self._jacobian_columns], + ) + return self._transform_prediction(y) + + def _make_prediction_labels(self, target_labels: List[str]) -> List[str]: + jacs = [] + x_tilde = [] + for label in target_labels: + x_tilde.append(label + "_tilde") + jacs.append(label + "_jac") + return x_tilde + jacs @final - def compute_loss( - self, predictions: Tensor, jacobian: Tensor, data: Data - ) -> Tensor: + def compute_loss(self, predictions: Tensor, data: Data) -> Tensor: """Compute loss for normalizing flow tasks. Args: prediction: transformed sample in latent distribution space. - jacobian: the jacobian associated with the transformation. data: the graph object. Returns: @@ -437,9 +458,12 @@ def compute_loss( weights = data[self._loss_weight] else: weights = None + + pred = predictions[:, self._coordinate_columns] + jacobian = predictions[:, self._jacobian_columns] loss = ( self._loss_function( - predictions=predictions, + predictions=pred, jacobian=jacobian, weights=weights, target=None, From 48b4a19717dc2f40930b2b7fced589db5868593f Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Wed, 20 Dec 2023 15:18:58 +0100 Subject: [PATCH 13/22] update example --- .../04_training/05_train_normalizing_flow.py | 85 +++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) diff --git a/examples/04_training/05_train_normalizing_flow.py b/examples/04_training/05_train_normalizing_flow.py index 116fe0b17..9f08a0f4c 100644 --- a/examples/04_training/05_train_normalizing_flow.py +++ b/examples/04_training/05_train_normalizing_flow.py @@ -38,6 +38,7 @@ def main( early_stopping_patience: int, batch_size: int, num_workers: int, + wandb: bool = False, ) -> None: """Run example.""" # Construct Logger @@ -133,30 +134,66 @@ def main( if __name__ == "__main__": - database = "/home/iwsatlas1/oersoe/github/graphnet/data/examples/sqlite/prometheus/prometheus-events.db" - pulsemap = "total" - target = "" - truth_table = "mc_truth" - gpus = None - max_epochs = 100 - early_stopping_patience = 5 - batch_size = 16 - num_workers = 1 - string_selection = [83.0, 84.0, 85.0, 86.0] - - string_mask = [] - for string in np.arange(0, 87): - if string not in string_selection: - string_mask.append(string) + + # Parse command-line arguments + parser = ArgumentParser( + description=""" +Train GNN model without the use of config files. +""" + ) + + parser.add_argument( + "--path", + help="Path to dataset file (default: %(default)s)", + default=f"{EXAMPLE_DATA_DIR}/sqlite/prometheus/prometheus-events.db", + ) + + parser.add_argument( + "--pulsemap", + help="Name of pulsemap to use (default: %(default)s)", + default="total", + ) + + parser.add_argument( + "--target", + help=( + "Name of feature to use as regression target (default: " + "%(default)s)" + ), + default="total_energy", + ) + + parser.add_argument( + "--truth-table", + help="Name of truth table to be used (default: %(default)s)", + default="mc_truth", + ) + + parser.with_standard_arguments( + "gpus", + ("max-epochs", 1), + "early-stopping-patience", + ("batch-size", 16), + "num-workers", + ) + + parser.add_argument( + "--wandb", + action="store_true", + help="If True, Weights & Biases are used to track the experiment.", + ) + + args, unknown = parser.parse_known_args() main( - path=database, - pulsemap=pulsemap, - target=target, - truth_table=truth_table, - gpus=gpus, - max_epochs=max_epochs, - early_stopping_patience=early_stopping_patience, - batch_size=batch_size, - num_workers=num_workers, + args.path, + args.pulsemap, + args.target, + args.truth_table, + args.gpus, + args.max_epochs, + args.early_stopping_patience, + args.batch_size, + args.num_workers, + args.wandb, ) From 7b4b16af252ac0993655876304f8a7ea9f817a1c Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Wed, 20 Dec 2023 15:25:32 +0100 Subject: [PATCH 14/22] remove misplaced abstractmethod decorator --- src/graphnet/models/task/task.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/graphnet/models/task/task.py b/src/graphnet/models/task/task.py index 7bdee55f3..4f7c7589d 100644 --- a/src/graphnet/models/task/task.py +++ b/src/graphnet/models/task/task.py @@ -420,7 +420,6 @@ def nb_inputs(self) -> int: """Return number of inputs assumed by task.""" return len(self._target_labels) - @abstractmethod def _forward(self, x: Tensor, jacobian: Tensor) -> Tensor: # do nothing return torch.cat([x, jacobian], dim=1) From 6b0a55b70912caa7b26b6e67839c213e83ce74a1 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Wed, 20 Dec 2023 15:55:28 +0100 Subject: [PATCH 15/22] remove unused import --- src/graphnet/training/loss_functions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/graphnet/training/loss_functions.py b/src/graphnet/training/loss_functions.py index 55fb61baa..9888ab72f 100644 --- a/src/graphnet/training/loss_functions.py +++ b/src/graphnet/training/loss_functions.py @@ -6,7 +6,6 @@ from abc import abstractmethod from typing import Any, Optional, Union, List, Dict -import inspect import numpy as np import scipy.special From 14f71cb8a693654db24dfddb5770db21b5ecb2bf Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 26 Jan 2024 13:39:04 +0100 Subject: [PATCH 16/22] remove commented-out lines of code in spline_blocks.py --- src/graphnet/models/flows/spline_blocks.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/graphnet/models/flows/spline_blocks.py b/src/graphnet/models/flows/spline_blocks.py index 041421cbd..b08a4ec9b 100644 --- a/src/graphnet/models/flows/spline_blocks.py +++ b/src/graphnet/models/flows/spline_blocks.py @@ -300,16 +300,10 @@ def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: x_1 = x[:, self.partition[1]] spline_params_1 = self.nn_0(x_0) / 100 - # if torch.sum(spline_params_1.isnan()==True) > 1: - # self.error("NaN encountered in spline parameterization!") - # assert torch.sum(spline_params_1.isnan()==True) == 0 y_1, jacobian_1 = self.apply_splines_to_each_dimension( x=x_1, spline_parameters=spline_params_1 ) spline_params_0 = self.nn_1(y_1) / 100 - # if torch.sum(spline_params_0.isnan()==True) > 1: - # self.error("NaN encountered in spline parameterization!") - # assert torch.sum(spline_params_0.isnan()==True) == 0 y_0, jacobian_0 = self.apply_splines_to_each_dimension( x=x_0, spline_parameters=spline_params_0 ) From 966ddec184ae625343e3d66c99eabe932c667c1c Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 23 Feb 2024 12:25:04 +0100 Subject: [PATCH 17/22] remove unused imports --- examples/04_training/05_train_normalizing_flow.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/04_training/05_train_normalizing_flow.py b/examples/04_training/05_train_normalizing_flow.py index 9f08a0f4c..d838b0d51 100644 --- a/examples/04_training/05_train_normalizing_flow.py +++ b/examples/04_training/05_train_normalizing_flow.py @@ -3,10 +3,6 @@ import os from typing import Any, Dict, List, Optional -from pytorch_lightning.loggers import WandbLogger -import numpy as np -import pandas as pd - from graphnet.constants import EXAMPLE_DATA_DIR, EXAMPLE_OUTPUT_DIR from graphnet.data.constants import FEATURES, TRUTH from graphnet.models import StandardModel From 87ed567f5b829b86a2a83a37930535018fa5cc76 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 23 Feb 2024 12:28:14 +0100 Subject: [PATCH 18/22] remove self._eps --- src/graphnet/models/flows/spline_blocks.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/graphnet/models/flows/spline_blocks.py b/src/graphnet/models/flows/spline_blocks.py index b08a4ec9b..67fc9d26d 100644 --- a/src/graphnet/models/flows/spline_blocks.py +++ b/src/graphnet/models/flows/spline_blocks.py @@ -22,7 +22,6 @@ def __init__(self, n_knots: int, b: int): self.b = b self._softmax = torch.nn.Softmax(dim=1) self._softplus = torch.nn.Softplus() - self._eps = 0 def forward_spline( self, @@ -219,12 +218,12 @@ def _setup_coordinates( knot_x = self._transform_to_internal_coordinates(knot_x_bins) knot_y = self._transform_to_internal_coordinates(knot_y_bins) knot_x = torch.nn.functional.pad(knot_x, (1, 0)) - knot_x[:, 0] = -self.b - self._eps - knot_x[:, -1] = knot_x[:, -1] + self._eps + knot_x[:, 0] = -self.b + knot_x[:, -1] = knot_x[:, -1] knot_y = torch.nn.functional.pad(knot_y, (1, 0)) - knot_y[:, 0] = -self.b - self._eps - knot_y[:, -1] = knot_y[:, -1] + self._eps + knot_y[:, 0] = -self.b + knot_y[:, -1] = knot_y[:, -1] d = self._softplus(d) d = torch.nn.functional.pad(d, (1, 1), value=1.0) From 13869141e04fadab0a32e80f74ba41cecb351362 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 23 Feb 2024 12:32:30 +0100 Subject: [PATCH 19/22] Remove assert --- src/graphnet/models/flows/spline_blocks.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/graphnet/models/flows/spline_blocks.py b/src/graphnet/models/flows/spline_blocks.py index 67fc9d26d..36d578e32 100644 --- a/src/graphnet/models/flows/spline_blocks.py +++ b/src/graphnet/models/flows/spline_blocks.py @@ -74,9 +74,6 @@ def inverse_spline( # Calculate knot index `k` and s k = self._find_spline_idx(knot_y, y) - assert ( - max(k) + 1 <= knot_y.shape[1] - ), f"""{knot_y.shape} vs. {max(k) + 1}""" s = self._calculate_s(knot_y, knot_x, k) # Calculate coefficients a, b and c from paper From 9db41d3bf30ec68a76a2f45eb5bb9fc613a06429 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 23 Feb 2024 12:39:49 +0100 Subject: [PATCH 20/22] remove comment --- src/graphnet/models/flows/inga.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/graphnet/models/flows/inga.py b/src/graphnet/models/flows/inga.py index a1082a123..fb725d388 100644 --- a/src/graphnet/models/flows/inga.py +++ b/src/graphnet/models/flows/inga.py @@ -112,7 +112,6 @@ def forward(self, data: Data) -> Tuple[torch.Tensor, torch.Tensor]: c = 0 x = data.x for spline_block in self.spline_blocks: - # self.info(f"spline block {c}") if is_first: y, partition_jacobian = spline_block(x=x) global_jacobian = partition_jacobian From e44804a80a3adc1dfde359455ddc7400fdd3040a Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 23 Feb 2024 12:40:14 +0100 Subject: [PATCH 21/22] remove unused import --- src/graphnet/models/flows/inga.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/graphnet/models/flows/inga.py b/src/graphnet/models/flows/inga.py index fb725d388..d98c81eea 100644 --- a/src/graphnet/models/flows/inga.py +++ b/src/graphnet/models/flows/inga.py @@ -9,7 +9,6 @@ from graphnet.models.flows import NormalizingFlow from graphnet.models.flows.spline_blocks import ( - SplineBlock, TwoPartitionSplineBlock, ) from torch_geometric.data import Data From 4ae5f1ba6b4b1139a852a8585174862935048ec0 Mon Sep 17 00:00:00 2001 From: Rasmus Oersoe Date: Fri, 23 Feb 2024 12:47:19 +0100 Subject: [PATCH 22/22] set default value for `target` in loss function --- src/graphnet/models/task/task.py | 1 - src/graphnet/training/loss_functions.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/graphnet/models/task/task.py b/src/graphnet/models/task/task.py index 4f7c7589d..b010cbb3b 100644 --- a/src/graphnet/models/task/task.py +++ b/src/graphnet/models/task/task.py @@ -465,7 +465,6 @@ def compute_loss(self, predictions: Tensor, data: Data) -> Tensor: predictions=pred, jacobian=jacobian, weights=weights, - target=None, ) + self._regularisation_loss ) diff --git a/src/graphnet/training/loss_functions.py b/src/graphnet/training/loss_functions.py index 9888ab72f..24645ed93 100644 --- a/src/graphnet/training/loss_functions.py +++ b/src/graphnet/training/loss_functions.py @@ -34,7 +34,7 @@ def __init__(self, **kwargs: Any) -> None: def forward( # type: ignore[override] self, predictions: Tensor, - target: Tensor, + target: Tensor = None, weights: Optional[Tensor] = None, return_elements: bool = False, jacobian: Tensor = None,