From 02618d6264dae5b1b71a06840f3829b2a08c8d09 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 25 Jul 2022 20:18:20 -0700 Subject: [PATCH] separated parallel routines into parallel.py, fixed readiness bug in subsampling in readiness --- descqa/configs/srv_ngals.yaml | 12 +- descqa/configs/srv_readiness.yaml | 9 +- descqa/parallel.py | 103 +++++++++++++++++ descqa/srv_ngals.py | 185 ++++++++++-------------------- descqa/srv_readiness_test.py | 32 +++--- descqa/srv_s1s2.py | 10 +- 6 files changed, 183 insertions(+), 168 deletions(-) create mode 100644 descqa/parallel.py diff --git a/descqa/configs/srv_ngals.yaml b/descqa/configs/srv_ngals.yaml index a28e186a..1d0006f2 100644 --- a/descqa/configs/srv_ngals.yaml +++ b/descqa/configs/srv_ngals.yaml @@ -1,28 +1,26 @@ subclass_name: srv_ngals.CheckNgals description: 'Check flags' included_by_default: true - ra: 'ra' dec: 'dec' -nside: 64 flags_to_check: - quantities: 'psFlux_flag_u' kind: 'bool' - flag_val: True + flag_val: False - quantities: 'good' kind: 'bool' - flag_val: True + flag_val: False - quantities: 'I_flag' kind: 'bool' - flag_val: True + flag_val: False - quantities: 'cModelFlux_flag_r' kind: 'bool' - flag_val: True + flag_val: False catalog_filters: - - filters: ['mag_r < 28', 'mag_i < 28'] + - filters: ['extendedness==1'] diff --git a/descqa/configs/srv_readiness.yaml b/descqa/configs/srv_readiness.yaml index 8c853f6b..5aed069e 100644 --- a/descqa/configs/srv_readiness.yaml +++ b/descqa/configs/srv_readiness.yaml @@ -2,7 +2,7 @@ subclass_name: readiness_test_synchronous.CheckQuantities description: 'Plot histograms of listed quantities and perform range, finiteness, mean and standard deviation checks.' included_by_default: true -percentile_max: 100000 +subset_size: 100000 quantities_to_check: - quantities: ['ra', 'dec'] @@ -12,13 +12,6 @@ quantities_to_check: max: [2.49, 2.51] f_inf: 0 - - quantities: ['blendedness'] - kind: 'double' - label: 'blendedness' - min: [-0.05, 0.05] - max: [0.95, 1.05] - f_inf: 0 - - quantities: 'cModelFlux_i' kind: 'double' label: 'cModelFlux' diff --git a/descqa/parallel.py b/descqa/parallel.py new file mode 100644 index 00000000..9f29cbd0 --- /dev/null +++ b/descqa/parallel.py @@ -0,0 +1,103 @@ +""" +MPI-related utility functions for descqa +""" +from __future__ import unicode_literals, division, print_function, absolute_import +import numpy as np +from mpi4py import MPI + +comm = MPI.COMM_WORLD +size = comm.Get_size() +rank = comm.Get_rank() + + +__all__ = [ + 'send_to_master', + 'get_ra_dec', +] + + +def send_to_master(value, kind): + """ + Parameters + ---------- + value : ndarray + rank-local array of values to communicate + kind : str + type of variable. Currently implemented options are double, bool + + Returns + ------- + recvbuf : ndarray + array of all rank values for rank 0 + None value otherwise + """ + count = len(value) + tot_num = comm.reduce(count) + counts = comm.allgather(count) + + if rank==0: + if kind=='double': + recvbuf = np.zeros(tot_num) + elif kind=='bool': + recvbuf = np.zeros(tot_num)!=0.0 + else: + raise NotImplementedError + else: + recvbuf = None + + displs = np.array([sum(counts[:p]) for p in range(size)]) + if kind=='double': + comm.Gatherv([value,MPI.DOUBLE], [recvbuf,counts,displs,MPI.DOUBLE],root=0) + elif kind=='float': + comm.Gatherv([value,MPI.FLOAT], [recvbuf,counts,displs,MPI.FLOAT],root=0) + elif kind=='int': + comm.Gatherv([value,MPI.INT], [recvbuf,counts,displs,MPI.INT],root=0) + elif kind=='bool': + comm.Gatherv([value,MPI.BOOL], [recvbuf,counts,displs,MPI.BOOL],root=0) + elif kind=='int64': + comm.Gatherv([value,MPI.INT64_T], [recvbuf, counts, displs, MPI.INT64_T],root=0) + else: + raise NotImplementedError + + return recvbuf + + +def get_ra_dec(ra,dec,catalog_data): + """ + Parameters + ---------- + ra : str + variable name representing right ascension + dec: str + variable name representing declination + catalog_data : GCRCatalog object + GCRCatalog object holding catalog data + + Returns + ------- + recvbuf_ra : ndarray + rank 0 outputs array of all ra values + other ranks output a None value + recvbuf_dec : ndarray + rank 0 outputs array of all dec values + other ranks output a None value + + """ + data_rank={} + recvbuf={} + for quantity in [ra,dec]: + data_rank[quantity] = catalog_data[quantity] + count = len(data_rank[quantity]) + tot_num = comm.reduce(count) + counts = comm.allgather(count) + if rank==0: + recvbuf[quantity] = np.zeros(tot_num) + else: + recvbuf[quantity] = None + displs = np.array([sum(counts[:p]) for p in range(size)]) + comm.Gatherv([data_rank[quantity],MPI.DOUBLE], [recvbuf[quantity], counts, displs, MPI.DOUBLE],root=0) + recvbuf_ra = recvbuf[ra] + recvbuf_dec = recvbuf[dec] + + return recvbuf_ra, recvbuf_dec + diff --git a/descqa/srv_ngals.py b/descqa/srv_ngals.py index 424e761a..244cbfc6 100644 --- a/descqa/srv_ngals.py +++ b/descqa/srv_ngals.py @@ -2,16 +2,14 @@ import os import re import fnmatch -from itertools import cycle -from collections import defaultdict, OrderedDict +from itertools import cycle, chain +from collections import defaultdict import numpy as np -import numexpr as ne -from scipy.stats import norm from mpi4py import MPI -import healpy as hp from .base import BaseValidationTest, TestResult from .plotting import plt +from .parallel import send_to_master, get_ra_dec comm = MPI.COMM_WORLD size = comm.Get_size() @@ -27,6 +25,10 @@ class CheckNgals(BaseValidationTest): """ def __init__(self, **kwargs): + ''' + Read inputs from yaml file and initialize class + ''' + self.flags_to_check = kwargs.get('flags_to_check', []) self.catalog_filters = kwargs.get('catalog_filters', []) self.lgndtitle_fontsize = kwargs.get('lgndtitle_fontsize', 12) @@ -37,7 +39,7 @@ def __init__(self, **kwargs): self.legend_size = kwargs.get('legend_size', 'x-small') self.ra = kwargs.get('ra') self.dec = kwargs.get('dec') - self.nside = kwargs.get('nside') + if not any(( self.flags_to_check, @@ -46,43 +48,37 @@ def __init__(self, **kwargs): raise ValueError('must specify flags_to_check, catalog_filters') - self.enable_individual_summary = bool(kwargs.get('enable_individual_summary', True)) - self.enable_aggregated_summary = bool(kwargs.get('enable_aggregated_summary', False)) - self.always_show_plot = bool(kwargs.get('always_show_plot', True)) - - self.nbins = int(kwargs.get('nbins', 50)) self.prop_cycle = None - self.current_catalog_name = None self.current_failed_count = None - self._aggregated_header = list() - self._aggregated_table = list() self._individual_header = list() self._individual_table = list() super(CheckNgals, self).__init__(**kwargs) + def record_result(self, results, quantity_name=None, more_info=None, failed=None, individual_only=False): + ''' + Record result by updating failed count and summary + ''' if isinstance(results, dict): self.current_failed_count += sum(1 for v in results.values() if v[1] == 'fail') elif failed: self.current_failed_count += 1 - if self.enable_individual_summary: - if quantity_name is None: - self._individual_header.append(self.format_result_header(results, failed)) - else: - self._individual_table.append(self.format_result_row(results, quantity_name, more_info)) + if quantity_name is None: + # update header if no quantity name specified + self._individual_header.append(self.format_result_header(results, failed)) + else: + # else add a row to the table + self._individual_table.append(self.format_result_row(results, quantity_name, more_info)) + - if self.enable_aggregated_summary and not individual_only: - if quantity_name is None: - results = '{} {}'.format(self.current_catalog_name, results) if self.current_catalog_name else results - self._aggregated_header.append(self.format_result_header(results, failed)) - else: - quantity_name = '{} {}'.format(self.current_catalog_name, quantity_name) if self.current_catalog_name else quantity_name - self._aggregated_table.append(self.format_result_row(results, quantity_name, more_info)) def format_result_row(self, results, quantity_name, more_info): + ''' + Add result, fail class and variable title to output rows + ''' more_info = 'title="{}"'.format(more_info) if more_info else '' output = ['', '{0}'.format(quantity_name, more_info)] for s in range(2): @@ -92,19 +88,19 @@ def format_result_row(self, results, quantity_name, more_info): @staticmethod def format_result_header(results, failed=False): + ''' + Add non-table result to summary + ''' return '{0}'.format(results, 'class="fail"' if failed else '') - def generate_summary(self, output_dir, aggregated=False): - if aggregated: - if not self.enable_aggregated_summary: - return - header = self._aggregated_header - table = self._aggregated_table - else: - if not self.enable_individual_summary: - return - header = self._individual_header - table = self._individual_table + + def generate_summary(self, output_dir): + ''' + Generate summary table + ''' + + header = self._individual_header + table = self._individual_table with open(os.path.join(output_dir, 'SUMMARY.html'), 'w') as f: f.write('\n') @@ -116,7 +112,6 @@ def generate_summary(self, output_dir, aggregated=False): f.write('\n') f.write('
\n') - f.write('\n') for s in ['ngals (per sq arcmin)', 'percentage retained']: f.write(''.format(s)) @@ -126,11 +121,10 @@ def generate_summary(self, output_dir, aggregated=False): f.write('\n') f.write('
Quantity{}
\n') + self._individual_header.clear() + self._individual_table.clear() - if not aggregated: - self._individual_header.clear() - self._individual_table.clear() def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): @@ -140,7 +134,6 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): self.current_catalog_name = catalog_name self.current_failed_count = 0 galaxy_count = None - quantity_hashes = defaultdict(set) if rank==0: self.record_result('Running galaxy number test on {} {}'.format( @@ -153,19 +146,19 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): catalog_name = catalog_name.partition("_")[0] version = getattr(catalog_instance, 'version', '') if not self.no_version else '' + # create filter labels filters=[] for i, filt in enumerate(self.catalog_filters): - filters = filt['filters'] - - lgnd_loc_dflt ='best' + filters.append(filt['filters']) + filters = list(chain(*filters)) flags_tot =[] label_tot=[] plots_tot=[] for i, checks in enumerate(self.flags_to_check): - # total list of quantities + # total list of quantities - note do we need the quantity_pattern matching here or can we assume we know the input quantities? quantity_patterns = checks['quantities'] if isinstance(checks['quantities'], (tuple, list)) else [checks['quantities']] quantities_this = set() @@ -190,18 +183,10 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): plots_tot.append(plot_filename) - quantities_this_new=[] - for q in flags_tot: - print(q) - if len(q)>1: - for j in q: - quantities_this_new.append(j) - else: - quantities_this_new.append(q[0]) + quantities_this_new = list(chain(*flags_tot)) quantities_this_new.append(self.ra) quantities_this_new.append(self.dec) - quantities_this_new = tuple(quantities_this_new) - print(quantities_this_new) + if len(filters) > 0: @@ -209,42 +194,13 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): else: catalog_data = catalog_instance.get_quantities(quantities_this_new,return_iterator=False, rank=rank, size=size) - data_rank={} - recvbuf={} - for quantity in [self.ra,self.dec]: - data_rank[quantity] = catalog_data[quantity] - count = len(data_rank[quantity]) - tot_num = comm.reduce(count) - counts = comm.allgather(count) - if rank==0: - recvbuf[quantity] = np.zeros(tot_num) - else: - recvbuf[quantity] = None - displs = np.array([sum(counts[:p]) for p in range(size)]) - comm.Gatherv([data_rank[quantity],MPI.DOUBLE], [recvbuf[quantity], counts, displs, MPI.DOUBLE],root=0) - recvbuf_ra = recvbuf[self.ra] - recvbuf_dec = recvbuf[self.dec] + recvbuf_ra, recvbuf_dec = get_ra_dec(self.ra,self.dec,catalog_data) if rank==0: galaxy_count = len(recvbuf_ra) self.record_result('Found {} entries in this catalog.'.format(galaxy_count)) - #idx_vals = np.arange(len(recvbuf_ra)) - #np.random.shuffle(idx_vals) - #for i in range(10000):#catalog_instance.get_quantities(['ra_true', 'dec_true'], return_iterator=True): - # pixels.update(hp.ang2pix(self.nside, recvbuf_ra[idx_vals[i]], recvbuf_dec[idx_vals[i]], lonlat=True)) - #frac = len(pixels) / hp.nside2npix(self.nside) - #skyarea = frac * np.rad2deg(np.rad2deg(4.0*np.pi)) - - #print(skyarea) - - #hp_map = np.empty(hp.nside2npix(self.nside)) - #hp_map.fill(hp.UNSEEN) - #hp_map[list(pixels)] = 0 - #hp.mollview(hp_map, title=catalog_name, coord='C', cbar=None) - - for i, checks in enumerate(self.flags_to_check): #quantities_this = checks['quantities'] kind = checks['kind'] @@ -257,66 +213,41 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): fig, ax = plt.subplots() for quantity in quantities_this: - #PL : only currently works for doubles + #PL : only currently works for doubles and booleans value = catalog_data[quantity] - count = len(value) - tot_num = comm.reduce(count) - counts = comm.allgather(count) - if rank==0: - if kind=='double': - recvbuf = np.zeros(tot_num) - elif kind=='bool': - recvbuf = np.zeros(tot_num)!=0.0 - else: - recvbuf = None - displs = np.array([sum(counts[:p]) for p in range(size)]) - if kind=='double': - comm.Gatherv([value,MPI.DOUBLE], [recvbuf,counts,displs,MPI.DOUBLE],root=0) - elif kind=='float': - comm.Gatherv([value,MPI.FLOAT], [recvbuf,counts,displs,MPI.FLOAT],root=0) - elif kind=='int': - comm.Gatherv([value,MPI.INT], [recvbuf,counts,displs,MPI.INT],root=0) - elif kind=='bool': - comm.Gatherv([value,MPI.BOOL], [recvbuf,counts,displs,MPI.BOOL],root=0) - elif kind=='int64': - comm.Gatherv([value,MPI.INT64_T], [recvbuf, counts, displs, MPI.INT64_T],root=0) - else: - print("add proper exception catch here") - - + recvbuf = send_to_master(value, kind) + if rank==0: + flag_val=False result_this_quantity = {} galaxy_count = len(recvbuf) + recvbuf = np.logical_not(recvbuf) frac = np.sum(recvbuf)/(len(recvbuf)+0.0)*100. xbins = np.linspace(np.min(recvbuf_ra),np.max(recvbuf_ra),50) ybins = np.linspace(np.min(recvbuf_dec),np.max(recvbuf_dec),50) - area = (xbins[1]-xbins[0])*(ybins[1]-ybins[0])*(60.**2) #arcminutes - im = ax.hist2d(recvbuf_ra[recvbuf],recvbuf_dec[recvbuf], bins=(xbins,ybins),weights = 1./area*np.ones(len(recvbuf_ra[recvbuf])))#, label=quantity) + area = (xbins[1]-xbins[0])*(ybins[1]-ybins[0])*(60.**2)*(np.sin(ybins[1]*np.pi/180.)-np.sin(ybins[0]*np.pi/180.)) + im = ax.hist2d(recvbuf_ra[recvbuf],recvbuf_dec[recvbuf], bins=(xbins,ybins),weights = 1./area*np.ones(len(recvbuf_ra[recvbuf]))) result_this_quantity[0] = ( - frac, - ('fail' if frac<50 else 'pass'), + np.mean(im[0][im[0]>0]), + 'pass', quantity, ) - result_this_quantity[1] = ( - np.mean(im[0][im[0]>0]), - ('fail' if np.mean(im[0][im[0]>0]<1.0) else 'pass'), + frac, + 'pass', quantity, ) - quantity_hashes[tuple(result_this_quantity[s][0] for s in [0,1])].add(quantity) self.record_result( result_this_quantity, quantity , - plots_tot[i] ) - - #ax.set_xlabel(('log ' if checks.get('log') else '') + quantity_group_label, size=self.font_size) - #ax.yaxis.set_ticklabels([]) + + if checks.get('plot_min') is not None: #zero values fail otherwise ax.set_xlim(left=checks.get('plot_min')) if checks.get('plot_max') is not None: @@ -328,14 +259,16 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): fig.savefig(os.path.join(output_dir, plots_tot[i])) plt.close(fig) + if rank==0: self.generate_summary(output_dir) else: self.current_failed_count=0 - + self.current_failed_count = comm.bcast(self.current_failed_count, root=0) return TestResult(passed=(self.current_failed_count == 0), score=self.current_failed_count) + def conclude_test(self, output_dir): - self.generate_summary(output_dir, aggregated=True) + self.generate_summary(output_dir) diff --git a/descqa/srv_readiness_test.py b/descqa/srv_readiness_test.py index 08786e18..20deb515 100644 --- a/descqa/srv_readiness_test.py +++ b/descqa/srv_readiness_test.py @@ -37,13 +37,13 @@ def check_uniqueness(x, mask=None): return check_uniqueness(x[mask]) -def find_outlier(x): +def find_outlier(x, subset_size): """ return a bool array indicating outliers or not in *x* """ # note: percentile calculation should be robust to outliers so doesn't need the full sample. This speeds the calculation up a lot. There is a chance of repeated values but for large datasets this is not significant. - if len(x)>100000: - x_small = np.random.choice(x,size=10000) + if len(x)> subset_size: + x_small = np.random.choice(x,size=subset_size) l, m, h = np.percentile(x_small, norm.cdf([-1, 0, 1])*100) else: l, m, h = np.percentile(x_small, norm.cdf([-1, 0, 1])*100) @@ -59,12 +59,12 @@ def calc_frac(x, func, total=None): return np.count_nonzero(func(x)) / total -def calc_median(x): +def calc_median(x, subset_size): """ calculate the median of sample, using sub-set for large datasets """ - if len(x)>100000: - x_small = np.random.choice(x,size=10000) + if len(x)>subset_size: + x_small = np.random.choice(x,size=subset_size) return np.median(x_small) else: return np.median(x) @@ -115,7 +115,7 @@ class CheckQuantities(BaseValidationTest): stats = OrderedDict(( ('min', np.min), ('max', np.max), - ('median', np.median), + ('median', calc_median), ('mean', np.mean), ('std', np.std), ('f_inf', np.isinf), @@ -135,6 +135,7 @@ def __init__(self, **kwargs): self.title_size = kwargs.get('title_size', 'small') self.font_size = kwargs.get('font_size', 12) self.legend_size = kwargs.get('legend_size', 'x-small') + self.subset_size = kwargs.get('subset_size',100000) if not any(( self.quantities_to_check, @@ -347,17 +348,10 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): for quantity in quantities_this: - #PL : only currently works for doubles + #PL : only currently works for doubles and booleans right now value = catalog_data[quantity] - count = len(value) - tot_num = comm.reduce(count) - counts = comm.allgather(count) - if rank==0: - recvbuf = np.zeros(tot_num) - else: - recvbuf = None - displs = np.array([sum(counts[:p]) for p in range(size)]) - comm.Gatherv([value,MPI.DOUBLE], [recvbuf,counts,displs,MPI.DOUBLE],root=0) + recvbuf = send_to_master(value, 'double') + need_plot = False if rank==0: @@ -383,7 +377,9 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): b = time.time() # PL: note there are many faster ways to do this if s == 'f_outlier': - s_value_rank = calc_frac(value_finite, func) + s_value_rank = calc_frac(value_finite, self.subset_size) + elif s == 'median': + s_value_rank = calc_median(value_finite, self.subset_size) elif s.startswith('f_'): s_value = calc_frac(value, func) else: diff --git a/descqa/srv_s1s2.py b/descqa/srv_s1s2.py index fc18e260..84d0567e 100644 --- a/descqa/srv_s1s2.py +++ b/descqa/srv_s1s2.py @@ -257,15 +257,7 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir): recvbuf={} for quantity in quantities: data_rank[quantity] = catalog_data[quantity] - count = len(data_rank[quantity]) - tot_num = comm.reduce(count) - counts = comm.allgather(count) - if rank==0: - recvbuf[quantity] = np.zeros(tot_num) - else: - recvbuf[quantity] = None - displs = np.array([sum(counts[:p]) for p in range(size)]) - comm.Gatherv([data_rank[quantity],MPI.DOUBLE], [recvbuf[quantity], counts, displs, MPI.DOUBLE],root=0) + recvbuf[quantity] = send_to_master(data_rank[quantity],'double') e1,e2 = shear_from_moments(recvbuf[self.Ixx+'_'+band],recvbuf[self.Ixy+'_'+band],recvbuf[self.Iyy+'_'+band]) e1psf,e2psf = shear_from_moments(recvbuf[self.IxxPSF+'_'+band],recvbuf[self.IxyPSF+'_'+band],recvbuf[self.IyyPSF+'_'+band])