From 26edfda5160bda9b0cc2258314818b749b485977 Mon Sep 17 00:00:00 2001 From: Jakob van Santen Date: Fri, 6 Sep 2024 13:45:51 +0200 Subject: [PATCH 1/6] Remove unnecessary np.copy Slicing with a boolean array already returns a copy --- flarestack/core/injector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flarestack/core/injector.py b/flarestack/core/injector.py index 7e17fed7..c7d6ff23 100644 --- a/flarestack/core/injector.py +++ b/flarestack/core/injector.py @@ -128,7 +128,7 @@ def get_n_exp_single(self, source): else: name = source["source_name"] - return np.copy(self.n_exp[self.n_exp["source_name"] == name]) + return self.n_exp[self.n_exp["source_name"] == name] def get_expectation(self, source, scale): return float(self.get_n_exp_single(source)["n_exp"]) * scale @@ -260,7 +260,7 @@ def select_mc_band(self, source): band_mask = self.get_band_mask(source, min_dec, max_dec) - return np.copy(self._mc[band_mask]), omega, band_mask + return self._mc[band_mask], omega, band_mask def get_band_mask(self, source, min_dec, max_dec): # Checks if the mask has already been evaluated for the source From 1ba3f204acb82520d9d49e5a531b282c72016a46 Mon Sep 17 00:00:00 2001 From: Jakob van Santen Date: Fri, 6 Sep 2024 14:42:05 +0200 Subject: [PATCH 2/6] Add TableInjector Bandmasks are completely unncessary data are ordered in declination; binary search is cheap. Speeds up injector initialization by a factor ~60 with 1000 sources. --- flarestack/core/injector.py | 34 ++++++++++++++++++++++++++++++++- flarestack/core/minimisation.py | 2 +- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/flarestack/core/injector.py b/flarestack/core/injector.py index c7d6ff23..6978a9ef 100644 --- a/flarestack/core/injector.py +++ b/flarestack/core/injector.py @@ -4,6 +4,7 @@ import random import zipfile import zlib +from astropy.table import Table from flarestack.shared import band_mask_cache_name from flarestack.core.energy_pdf import EnergyPDF, read_e_pdf_dict from flarestack.core.time_pdf import TimePDF, read_t_pdf_dict @@ -228,7 +229,7 @@ class MCInjector(BaseInjector): def __init__(self, season, sources, **kwargs): kwargs = read_injector_dict(kwargs) - self._mc = season.get_mc() + self._mc = self.get_mc(season) BaseInjector.__init__(self, season, sources, **kwargs) self.injection_declination_bandwidth = self.inj_kwargs.pop( @@ -243,6 +244,9 @@ def __init__(self, season, sources, **kwargs): logger.warning("No Injection Arguments. Are you unblinding?") pass + def get_mc(self, season): + return season.get_mc() + def select_mc_band(self, source): """For a given source, selects MC events within a declination band of width +/- 5 degrees that contains the source. Then returns the MC data @@ -522,6 +526,34 @@ def get_band_mask(self, source, min_dec, max_dec): return self.band_mask_cache.getrow(entry["source_index"][0]).toarray()[0] +@MCInjector.register_subclass("table_injector") +class TableInjector(MCInjector): + """ + For even larger numbers of sources O(~1000), accessing every element of the + MC array in select_band_mask() once for every source in calculate_n_exp() + becomes a bottleneck. Store event fields in columns, ordered by declination, + making single-field accesses vastly cheaper and band masks practically free. + For 1000 sources, calculate_n_exp() is ~60x faster than MCInjector. + """ + + def get_mc(self, season): + mc: np.ndarray = season.get_mc() + # Sort rows by trueDec, and store as columns in a Table + table = Table(mc[np.argsort(mc["trueDec"].copy())]) + # Prevent in-place modifications + for k in table.columns: + table[k].setflags(write=False) + return table + + def get_band_mask(self, source, min_dec, max_dec): + return slice(*np.searchsorted(self._mc["trueDec"], [min_dec, max_dec])) + + def select_mc_band(self, source): + table, omega, band_mask = super().select_mc_band(source) + # allow individual columns to be replaced + return table.copy(copy_data=False), omega, band_mask + + @MCInjector.register_subclass("effective_area_injector") class EffectiveAreaInjector(BaseInjector): """Class for injecting signal events by relying on effective areas rather diff --git a/flarestack/core/minimisation.py b/flarestack/core/minimisation.py index bc847529..bfa6038d 100644 --- a/flarestack/core/minimisation.py +++ b/flarestack/core/minimisation.py @@ -1267,7 +1267,7 @@ class LargeCatalogueMinimisationHandler(FixedWeightMinimisationHandler): compatible_llh = ["standard_matrix", "std_matrix_kde_enabled"] compatible_negative_n_s = False - compatible_injectors = ["low_memory_injector"] + compatible_injectors = ["low_memory_injector", "table_injector"] def __init__(self, mh_dict): FixedWeightMinimisationHandler.__init__(self, mh_dict) From 2e48a3e4298f9b55cb5df2e88b8d5f80fcf62ddf Mon Sep 17 00:00:00 2001 From: Jakob van Santen Date: Fri, 6 Sep 2024 17:24:51 +0200 Subject: [PATCH 3/6] Order data by dec in StdMatrixKDEEnabledLLH --- flarestack/core/llh.py | 46 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/flarestack/core/llh.py b/flarestack/core/llh.py index 202c5586..5a4955dd 100644 --- a/flarestack/core/llh.py +++ b/flarestack/core/llh.py @@ -4,6 +4,7 @@ import numpy as np import scipy.interpolate from scipy import sparse +from astropy.table import Table from typing import Optional import pickle from flarestack.shared import ( @@ -1370,6 +1371,37 @@ def __init__(self, season, sources, llh_dict): + "please change 'the spatial_pdf_name' accordingly" ) + def get_dec_ra_masks(self, data, source) -> "tuple[slice, np.ndarray]": + """ + Get spatially coincident data for a single source, taking advantage of + the fact that data are sorted in dec + """ + width = np.deg2rad(self.spatial_box_width) + + # Sets a declination band 5 degrees above and below the source + min_dec = max(-np.pi / 2.0, source["dec_rad"] - width) + max_dec = min(np.pi / 2.0, source["dec_rad"] + width) + + # Accepts events lying within a 5 degree band of the source + dec_mask = slice(*np.searchsorted(data["dec"], [min_dec, max_dec])) + + # Sets the minimum value of cos(dec) + cos_factor = np.amin(np.cos([min_dec, max_dec])) + + # Scales the width of the box in ra, to give a roughly constant + # area. However, if the width would have to be greater that +/- pi, + # then sets the area to be exactly 2 pi. + dPhi = np.amin([2.0 * np.pi, 2.0 * width / cos_factor]) + + # Accounts for wrapping effects at ra=0, calculates the distance + # of each event to the source. + ra_dist = np.fabs( + (data["ra"][dec_mask] - source["ra_rad"] + np.pi) % (2.0 * np.pi) - np.pi + ) + ra_mask = ra_dist < dPhi / 2.0 + + return dec_mask, ra_mask + def create_kwargs(self, data, pull_corrector, weight_f=None): if weight_f is None: raise Exception( @@ -1377,6 +1409,8 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): "standard_overlapping LLH functions." ) + data = Table(data[np.argsort(data["dec"])]) + coincidence_matrix = sparse.lil_matrix( (len(self.sources), len(data)), dtype=bool ) @@ -1388,9 +1422,8 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): sources = self.sources for i, source in enumerate(sources): - s_mask = self.select_spatially_coincident_data(data, [source]) - - coincident_data = data[s_mask] + dec_mask, ra_mask = self.get_dec_ra_masks(data, source) + coincident_data = data[dec_mask][ra_mask] if len(coincident_data) > 0: # Only bother accepting neutrinos where the spacial @@ -1416,9 +1449,11 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): sig = self.signal_pdf(source, coincident_data, gamma=2.0) nonzero_mask = sig > spatial_mask_threshold - s_mask[s_mask] *= nonzero_mask + ra_mask[ra_mask] *= nonzero_mask - coincidence_matrix[i] = s_mask + coincidence_matrix[i, dec_mask.start : dec_mask.stop] = ra_mask + + coincidence_matrix = coincidence_matrix.tocsr() # Using Sparse matrixes coincident_nu_mask = np.sum(coincidence_matrix, axis=0) > 0 @@ -1429,7 +1464,6 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): coincidence_matrix = ( coincidence_matrix[coincident_source_mask].T[coincident_nu_mask].T ) - coincidence_matrix.tocsr() coincident_data = data[coincident_nu_mask] coincident_sources = sources[coincident_source_mask] From a3c7993572c6ee4872ce7bc9c41aa4e145fbe22e Mon Sep 17 00:00:00 2001 From: Jakob van Santen Date: Sat, 7 Sep 2024 15:27:04 +0200 Subject: [PATCH 4/6] Build coincidence matrix directly --- flarestack/core/llh.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/flarestack/core/llh.py b/flarestack/core/llh.py index 5a4955dd..f83589d1 100644 --- a/flarestack/core/llh.py +++ b/flarestack/core/llh.py @@ -1371,7 +1371,7 @@ def __init__(self, season, sources, llh_dict): + "please change 'the spatial_pdf_name' accordingly" ) - def get_dec_ra_masks(self, data, source) -> "tuple[slice, np.ndarray]": + def get_spatially_coincident_indices(self, data, source) -> np.ndarray: """ Get spatially coincident data for a single source, taking advantage of the fact that data are sorted in dec @@ -1383,7 +1383,7 @@ def get_dec_ra_masks(self, data, source) -> "tuple[slice, np.ndarray]": max_dec = min(np.pi / 2.0, source["dec_rad"] + width) # Accepts events lying within a 5 degree band of the source - dec_mask = slice(*np.searchsorted(data["dec"], [min_dec, max_dec])) + dec_range = slice(*np.searchsorted(data["dec"], [min_dec, max_dec])) # Sets the minimum value of cos(dec) cos_factor = np.amin(np.cos([min_dec, max_dec])) @@ -1396,11 +1396,9 @@ def get_dec_ra_masks(self, data, source) -> "tuple[slice, np.ndarray]": # Accounts for wrapping effects at ra=0, calculates the distance # of each event to the source. ra_dist = np.fabs( - (data["ra"][dec_mask] - source["ra_rad"] + np.pi) % (2.0 * np.pi) - np.pi + (data["ra"][dec_range] - source["ra_rad"] + np.pi) % (2.0 * np.pi) - np.pi ) - ra_mask = ra_dist < dPhi / 2.0 - - return dec_mask, ra_mask + return np.nonzero(ra_dist < dPhi / 2.0)[0] + dec_range.start def create_kwargs(self, data, pull_corrector, weight_f=None): if weight_f is None: @@ -1409,11 +1407,9 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): "standard_overlapping LLH functions." ) - data = Table(data[np.argsort(data["dec"])]) + data = Table(data[np.argsort(data[["dec", "ra"]])]) - coincidence_matrix = sparse.lil_matrix( - (len(self.sources), len(data)), dtype=bool - ) + coincidence_matrix_rows = [] kwargs = dict() @@ -1422,8 +1418,8 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): sources = self.sources for i, source in enumerate(sources): - dec_mask, ra_mask = self.get_dec_ra_masks(data, source) - coincident_data = data[dec_mask][ra_mask] + idx = self.get_spatially_coincident_indices(data, source) + coincident_data = data[idx] if len(coincident_data) > 0: # Only bother accepting neutrinos where the spacial @@ -1448,12 +1444,21 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): ): sig = self.signal_pdf(source, coincident_data, gamma=2.0) - nonzero_mask = sig > spatial_mask_threshold - ra_mask[ra_mask] *= nonzero_mask - - coincidence_matrix[i, dec_mask.start : dec_mask.stop] = ra_mask + nonzero_idx = np.nonzero(sig > spatial_mask_threshold)[0] + column_indices = idx[nonzero_idx] + + coincidence_matrix_rows.append( + sparse.csr_matrix( + ( + np.ones(column_indices.shape, dtype=bool), + column_indices, + [0, len(column_indices)], + ), + shape=(1, len(data)), + ) + ) - coincidence_matrix = coincidence_matrix.tocsr() + coincidence_matrix = sparse.vstack(coincidence_matrix_rows) # Using Sparse matrixes coincident_nu_mask = np.sum(coincidence_matrix, axis=0) > 0 From c2f35816551db9c64c7841702f0a7872f741b944 Mon Sep 17 00:00:00 2001 From: Jakob van Santen Date: Mon, 9 Sep 2024 09:49:50 +0200 Subject: [PATCH 5/6] Skip coincidence matrix entirely csr_matrix.getrow() is suprisingly slow (lots of layers of indirection and argument checking), but also completely unnecessary, since coincidence_matrix and SoB_only_matrix have the same sparsity structure by construction. --- flarestack/core/llh.py | 93 +++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 47 deletions(-) diff --git a/flarestack/core/llh.py b/flarestack/core/llh.py index f83589d1..b9989bbc 100644 --- a/flarestack/core/llh.py +++ b/flarestack/core/llh.py @@ -1407,21 +1407,24 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): "standard_overlapping LLH functions." ) + # Keep data in an astropy Table (column-wise) to improve cache + # performance. Sort to allow get_spatially_coincident_indices to find + # declination bands by binary search. data = Table(data[np.argsort(data[["dec", "ra"]])]) - coincidence_matrix_rows = [] + SoB_rows = [None] * len(self.sources) kwargs = dict() kwargs["n_all"] = float(len(data)) - sources = self.sources - - for i, source in enumerate(sources): + # Treat sources in declination order to keep caches hot + order = np.argsort(self.sources[["dec_rad", "ra_rad"]]) + for i in order: + source = self.sources[i] idx = self.get_spatially_coincident_indices(data, source) - coincident_data = data[idx] - if len(coincident_data) > 0: + if len(idx) > 0: # Only bother accepting neutrinos where the spacial # likelihood is greater than 1e-21. This prevents 0s # appearing in dynamic pull corrections, but also speeds @@ -1432,6 +1435,7 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): # the spatial pdf is calculated (ie the KDE spline is evaluated) for gamma = 2 # If we want to be correct this should be in a loop for the get_gamma_support_points # but that would add an extra dimension in the matrix so better not + coincident_data = data[idx] if ( self.spatial_pdf.signal.SplineIs4D and self.spatial_pdf.signal.KDE_eval_gamma is not None @@ -1444,34 +1448,35 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): ): sig = self.signal_pdf(source, coincident_data, gamma=2.0) - nonzero_idx = np.nonzero(sig > spatial_mask_threshold)[0] + nonzero_idx = np.nonzero(sig > spatial_mask_threshold) column_indices = idx[nonzero_idx] - coincidence_matrix_rows.append( - sparse.csr_matrix( - ( - np.ones(column_indices.shape, dtype=bool), - column_indices, - [0, len(column_indices)], - ), - shape=(1, len(data)), - ) + # build a single-row CSR matrix in canonical format + SoB_rows[i] = sparse.csr_matrix( + ( + sig[nonzero_idx] + / self.background_pdf(source, coincident_data[nonzero_idx]), + column_indices, + [0, len(column_indices)], + ), + shape=(1, len(data)), ) + else: + SoB_rows[i] = sparse.csr_matrix((1, len(data)), dtype=float) - coincidence_matrix = sparse.vstack(coincidence_matrix_rows) + SoB_only_matrix = sparse.vstack(SoB_rows, format="csr") - # Using Sparse matrixes - coincident_nu_mask = np.sum(coincidence_matrix, axis=0) > 0 - coincident_nu_mask = np.array(coincident_nu_mask).ravel() - coincident_source_mask = np.sum(coincidence_matrix, axis=1) > 0 - coincident_source_mask = np.array(coincident_source_mask).ravel() + coincident_nu_mask = np.asarray(np.sum(SoB_only_matrix, axis=0) != 0).ravel() + coincident_source_mask = np.asarray( + np.sum(SoB_only_matrix, axis=1) != 0 + ).ravel() - coincidence_matrix = ( - coincidence_matrix[coincident_source_mask].T[coincident_nu_mask].T + SoB_only_matrix = ( + SoB_only_matrix[coincident_source_mask].T[coincident_nu_mask].T ) coincident_data = data[coincident_nu_mask] - coincident_sources = sources[coincident_source_mask] + coincident_sources = self.sources[coincident_source_mask] season_weight = lambda x: weight_f([1.0, x], self.season)[ coincident_source_mask @@ -1489,18 +1494,6 @@ def create_kwargs(self, data, pull_corrector, weight_f=None): "Creating gamma-independent SoB matrix for all srcs when 3D KDE or 4D w/ 'spatial_pdf_index'" ) - SoB_only_matrix = sparse.lil_matrix(coincidence_matrix.shape, dtype=float) - - for i, src in enumerate(coincident_sources): - mask = (coincidence_matrix.getrow(i)).toarray()[0] - SoB_only_matrix[i, mask] = self.signal_pdf( - src, coincident_data[mask] - ) / self.background_pdf( # gamma = None - src, coincident_data[mask] - ) - - SoB_only_matrix = SoB_only_matrix.tocsr() - def joint_SoB(dataset, gamma): weight = np.array(season_weight(gamma)) weight /= np.sum(weight) @@ -1516,20 +1509,26 @@ def joint_SoB(dataset, gamma): weight = np.array(season_weight(gamma)) weight /= np.sum(weight) - # create an empty lil_matrix (good for matrix creation) with shape - # of coincidence_matrix and type float - SoB_matrix_sparse = sparse.lil_matrix( - coincidence_matrix.shape, dtype=float - ) - + # Build CSR matrix containing source_weight * S / B, with the + # same sparsity structure as SoB_only_matrix, taking advantage + # of the fact that the column indices are indices into `dataset` + data = np.empty_like(SoB_only_matrix.data) for i, src in enumerate(coincident_sources): - mask = (coincidence_matrix.getrow(i)).toarray()[0] - SoB_matrix_sparse[i, mask] = ( + row = slice( + SoB_only_matrix.indptr[i], SoB_only_matrix.indptr[i + 1] + ) + masked_dataset = dataset[SoB_only_matrix.indices[row]] + data[row] = ( weight[i] - * self.signal_pdf(src, dataset[mask], gamma) - / self.background_pdf(src, dataset[mask]) + * self.signal_pdf(src, masked_dataset, gamma) + / self.background_pdf(src, masked_dataset) ) + SoB_matrix_sparse = sparse.csr_matrix( + (data, SoB_only_matrix.indices, SoB_only_matrix.indptr), + shape=SoB_only_matrix, + ) + SoB_sum = SoB_matrix_sparse.sum(axis=0) return_value = np.array(SoB_sum).ravel() From 345bea1b527d240ddfb705fc9773879be796d9be Mon Sep 17 00:00:00 2001 From: Jakob van Santen Date: Tue, 10 Sep 2024 17:45:00 +0200 Subject: [PATCH 6/6] mypy: ignore all of astropy --- mypy.ini | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/mypy.ini b/mypy.ini index 475ef4d5..5c550cd5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,13 +2,7 @@ packages = flarestack exclude = analyses -[mypy-astropy] -ignore_missing_imports = True - -[mypy-astropy.coordinates] -ignore_missing_imports = True - -[mypy-astropy.cosmology] +[mypy-astropy.*] ignore_missing_imports = True [mypy-healpy]