diff --git a/descqa/configs/srv_ngals.yaml b/descqa/configs/srv_ngals.yaml
index a28e186a..1d0006f2 100644
--- a/descqa/configs/srv_ngals.yaml
+++ b/descqa/configs/srv_ngals.yaml
@@ -1,28 +1,26 @@
subclass_name: srv_ngals.CheckNgals
description: 'Check flags'
included_by_default: true
-
ra: 'ra'
dec: 'dec'
-nside: 64
flags_to_check:
- quantities: 'psFlux_flag_u'
kind: 'bool'
- flag_val: True
+ flag_val: False
- quantities: 'good'
kind: 'bool'
- flag_val: True
+ flag_val: False
- quantities: 'I_flag'
kind: 'bool'
- flag_val: True
+ flag_val: False
- quantities: 'cModelFlux_flag_r'
kind: 'bool'
- flag_val: True
+ flag_val: False
catalog_filters:
- - filters: ['mag_r < 28', 'mag_i < 28']
+ - filters: ['extendedness==1']
diff --git a/descqa/configs/srv_readiness.yaml b/descqa/configs/srv_readiness.yaml
index 8c853f6b..5aed069e 100644
--- a/descqa/configs/srv_readiness.yaml
+++ b/descqa/configs/srv_readiness.yaml
@@ -2,7 +2,7 @@ subclass_name: readiness_test_synchronous.CheckQuantities
description: 'Plot histograms of listed quantities and perform range, finiteness, mean and standard deviation checks.'
included_by_default: true
-percentile_max: 100000
+subset_size: 100000
quantities_to_check:
- quantities: ['ra', 'dec']
@@ -12,13 +12,6 @@ quantities_to_check:
max: [2.49, 2.51]
f_inf: 0
- - quantities: ['blendedness']
- kind: 'double'
- label: 'blendedness'
- min: [-0.05, 0.05]
- max: [0.95, 1.05]
- f_inf: 0
-
- quantities: 'cModelFlux_i'
kind: 'double'
label: 'cModelFlux'
diff --git a/descqa/parallel.py b/descqa/parallel.py
new file mode 100644
index 00000000..9f29cbd0
--- /dev/null
+++ b/descqa/parallel.py
@@ -0,0 +1,103 @@
+"""
+MPI-related utility functions for descqa
+"""
+from __future__ import unicode_literals, division, print_function, absolute_import
+import numpy as np
+from mpi4py import MPI
+
+comm = MPI.COMM_WORLD
+size = comm.Get_size()
+rank = comm.Get_rank()
+
+
+__all__ = [
+ 'send_to_master',
+ 'get_ra_dec',
+]
+
+
+def send_to_master(value, kind):
+ """
+ Parameters
+ ----------
+ value : ndarray
+ rank-local array of values to communicate
+ kind : str
+ type of variable. Currently implemented options are double, bool
+
+ Returns
+ -------
+ recvbuf : ndarray
+ array of all rank values for rank 0
+ None value otherwise
+ """
+ count = len(value)
+ tot_num = comm.reduce(count)
+ counts = comm.allgather(count)
+
+ if rank==0:
+ if kind=='double':
+ recvbuf = np.zeros(tot_num)
+ elif kind=='bool':
+ recvbuf = np.zeros(tot_num)!=0.0
+ else:
+ raise NotImplementedError
+ else:
+ recvbuf = None
+
+ displs = np.array([sum(counts[:p]) for p in range(size)])
+ if kind=='double':
+ comm.Gatherv([value,MPI.DOUBLE], [recvbuf,counts,displs,MPI.DOUBLE],root=0)
+ elif kind=='float':
+ comm.Gatherv([value,MPI.FLOAT], [recvbuf,counts,displs,MPI.FLOAT],root=0)
+ elif kind=='int':
+ comm.Gatherv([value,MPI.INT], [recvbuf,counts,displs,MPI.INT],root=0)
+ elif kind=='bool':
+ comm.Gatherv([value,MPI.BOOL], [recvbuf,counts,displs,MPI.BOOL],root=0)
+ elif kind=='int64':
+ comm.Gatherv([value,MPI.INT64_T], [recvbuf, counts, displs, MPI.INT64_T],root=0)
+ else:
+ raise NotImplementedError
+
+ return recvbuf
+
+
+def get_ra_dec(ra,dec,catalog_data):
+ """
+ Parameters
+ ----------
+ ra : str
+ variable name representing right ascension
+ dec: str
+ variable name representing declination
+ catalog_data : GCRCatalog object
+ GCRCatalog object holding catalog data
+
+ Returns
+ -------
+ recvbuf_ra : ndarray
+ rank 0 outputs array of all ra values
+ other ranks output a None value
+ recvbuf_dec : ndarray
+ rank 0 outputs array of all dec values
+ other ranks output a None value
+
+ """
+ data_rank={}
+ recvbuf={}
+ for quantity in [ra,dec]:
+ data_rank[quantity] = catalog_data[quantity]
+ count = len(data_rank[quantity])
+ tot_num = comm.reduce(count)
+ counts = comm.allgather(count)
+ if rank==0:
+ recvbuf[quantity] = np.zeros(tot_num)
+ else:
+ recvbuf[quantity] = None
+ displs = np.array([sum(counts[:p]) for p in range(size)])
+ comm.Gatherv([data_rank[quantity],MPI.DOUBLE], [recvbuf[quantity], counts, displs, MPI.DOUBLE],root=0)
+ recvbuf_ra = recvbuf[ra]
+ recvbuf_dec = recvbuf[dec]
+
+ return recvbuf_ra, recvbuf_dec
+
diff --git a/descqa/srv_ngals.py b/descqa/srv_ngals.py
index 424e761a..244cbfc6 100644
--- a/descqa/srv_ngals.py
+++ b/descqa/srv_ngals.py
@@ -2,16 +2,14 @@
import os
import re
import fnmatch
-from itertools import cycle
-from collections import defaultdict, OrderedDict
+from itertools import cycle, chain
+from collections import defaultdict
import numpy as np
-import numexpr as ne
-from scipy.stats import norm
from mpi4py import MPI
-import healpy as hp
from .base import BaseValidationTest, TestResult
from .plotting import plt
+from .parallel import send_to_master, get_ra_dec
comm = MPI.COMM_WORLD
size = comm.Get_size()
@@ -27,6 +25,10 @@ class CheckNgals(BaseValidationTest):
"""
def __init__(self, **kwargs):
+ '''
+ Read inputs from yaml file and initialize class
+ '''
+
self.flags_to_check = kwargs.get('flags_to_check', [])
self.catalog_filters = kwargs.get('catalog_filters', [])
self.lgndtitle_fontsize = kwargs.get('lgndtitle_fontsize', 12)
@@ -37,7 +39,7 @@ def __init__(self, **kwargs):
self.legend_size = kwargs.get('legend_size', 'x-small')
self.ra = kwargs.get('ra')
self.dec = kwargs.get('dec')
- self.nside = kwargs.get('nside')
+
if not any((
self.flags_to_check,
@@ -46,43 +48,37 @@ def __init__(self, **kwargs):
raise ValueError('must specify flags_to_check, catalog_filters')
- self.enable_individual_summary = bool(kwargs.get('enable_individual_summary', True))
- self.enable_aggregated_summary = bool(kwargs.get('enable_aggregated_summary', False))
- self.always_show_plot = bool(kwargs.get('always_show_plot', True))
-
- self.nbins = int(kwargs.get('nbins', 50))
self.prop_cycle = None
-
self.current_catalog_name = None
self.current_failed_count = None
- self._aggregated_header = list()
- self._aggregated_table = list()
self._individual_header = list()
self._individual_table = list()
super(CheckNgals, self).__init__(**kwargs)
+
def record_result(self, results, quantity_name=None, more_info=None, failed=None, individual_only=False):
+ '''
+ Record result by updating failed count and summary
+ '''
if isinstance(results, dict):
self.current_failed_count += sum(1 for v in results.values() if v[1] == 'fail')
elif failed:
self.current_failed_count += 1
- if self.enable_individual_summary:
- if quantity_name is None:
- self._individual_header.append(self.format_result_header(results, failed))
- else:
- self._individual_table.append(self.format_result_row(results, quantity_name, more_info))
+ if quantity_name is None:
+ # update header if no quantity name specified
+ self._individual_header.append(self.format_result_header(results, failed))
+ else:
+ # else add a row to the table
+ self._individual_table.append(self.format_result_row(results, quantity_name, more_info))
+
- if self.enable_aggregated_summary and not individual_only:
- if quantity_name is None:
- results = '{} {}'.format(self.current_catalog_name, results) if self.current_catalog_name else results
- self._aggregated_header.append(self.format_result_header(results, failed))
- else:
- quantity_name = '{} {}'.format(self.current_catalog_name, quantity_name) if self.current_catalog_name else quantity_name
- self._aggregated_table.append(self.format_result_row(results, quantity_name, more_info))
def format_result_row(self, results, quantity_name, more_info):
+ '''
+ Add result, fail class and variable title to output rows
+ '''
more_info = 'title="{}"'.format(more_info) if more_info else ''
output = ['
', '{0} | '.format(quantity_name, more_info)]
for s in range(2):
@@ -92,19 +88,19 @@ def format_result_row(self, results, quantity_name, more_info):
@staticmethod
def format_result_header(results, failed=False):
+ '''
+ Add non-table result to summary
+ '''
return '{0}'.format(results, 'class="fail"' if failed else '')
- def generate_summary(self, output_dir, aggregated=False):
- if aggregated:
- if not self.enable_aggregated_summary:
- return
- header = self._aggregated_header
- table = self._aggregated_table
- else:
- if not self.enable_individual_summary:
- return
- header = self._individual_header
- table = self._individual_table
+
+ def generate_summary(self, output_dir):
+ '''
+ Generate summary table
+ '''
+
+ header = self._individual_header
+ table = self._individual_table
with open(os.path.join(output_dir, 'SUMMARY.html'), 'w') as f:
f.write('\n')
@@ -116,7 +112,6 @@ def generate_summary(self, output_dir, aggregated=False):
f.write('\n')
f.write('
\n')
-
f.write('Quantity | \n')
for s in ['ngals (per sq arcmin)', 'percentage retained']:
f.write('{} | '.format(s))
@@ -126,11 +121,10 @@ def generate_summary(self, output_dir, aggregated=False):
f.write('\n')
f.write('
\n')
+ self._individual_header.clear()
+ self._individual_table.clear()
- if not aggregated:
- self._individual_header.clear()
- self._individual_table.clear()
def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
@@ -140,7 +134,6 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
self.current_catalog_name = catalog_name
self.current_failed_count = 0
galaxy_count = None
- quantity_hashes = defaultdict(set)
if rank==0:
self.record_result('Running galaxy number test on {} {}'.format(
@@ -153,19 +146,19 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
catalog_name = catalog_name.partition("_")[0]
version = getattr(catalog_instance, 'version', '') if not self.no_version else ''
+
# create filter labels
filters=[]
for i, filt in enumerate(self.catalog_filters):
- filters = filt['filters']
-
- lgnd_loc_dflt ='best'
+ filters.append(filt['filters'])
+ filters = list(chain(*filters))
flags_tot =[]
label_tot=[]
plots_tot=[]
for i, checks in enumerate(self.flags_to_check):
- # total list of quantities
+ # total list of quantities - note do we need the quantity_pattern matching here or can we assume we know the input quantities?
quantity_patterns = checks['quantities'] if isinstance(checks['quantities'], (tuple, list)) else [checks['quantities']]
quantities_this = set()
@@ -190,18 +183,10 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
plots_tot.append(plot_filename)
- quantities_this_new=[]
- for q in flags_tot:
- print(q)
- if len(q)>1:
- for j in q:
- quantities_this_new.append(j)
- else:
- quantities_this_new.append(q[0])
+ quantities_this_new = list(chain(*flags_tot))
quantities_this_new.append(self.ra)
quantities_this_new.append(self.dec)
- quantities_this_new = tuple(quantities_this_new)
- print(quantities_this_new)
+
if len(filters) > 0:
@@ -209,42 +194,13 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
else:
catalog_data = catalog_instance.get_quantities(quantities_this_new,return_iterator=False, rank=rank, size=size)
- data_rank={}
- recvbuf={}
- for quantity in [self.ra,self.dec]:
- data_rank[quantity] = catalog_data[quantity]
- count = len(data_rank[quantity])
- tot_num = comm.reduce(count)
- counts = comm.allgather(count)
- if rank==0:
- recvbuf[quantity] = np.zeros(tot_num)
- else:
- recvbuf[quantity] = None
- displs = np.array([sum(counts[:p]) for p in range(size)])
- comm.Gatherv([data_rank[quantity],MPI.DOUBLE], [recvbuf[quantity], counts, displs, MPI.DOUBLE],root=0)
- recvbuf_ra = recvbuf[self.ra]
- recvbuf_dec = recvbuf[self.dec]
+ recvbuf_ra, recvbuf_dec = get_ra_dec(self.ra,self.dec,catalog_data)
if rank==0:
galaxy_count = len(recvbuf_ra)
self.record_result('Found {} entries in this catalog.'.format(galaxy_count))
- #idx_vals = np.arange(len(recvbuf_ra))
- #np.random.shuffle(idx_vals)
- #for i in range(10000):#catalog_instance.get_quantities(['ra_true', 'dec_true'], return_iterator=True):
- # pixels.update(hp.ang2pix(self.nside, recvbuf_ra[idx_vals[i]], recvbuf_dec[idx_vals[i]], lonlat=True))
- #frac = len(pixels) / hp.nside2npix(self.nside)
- #skyarea = frac * np.rad2deg(np.rad2deg(4.0*np.pi))
-
- #print(skyarea)
-
- #hp_map = np.empty(hp.nside2npix(self.nside))
- #hp_map.fill(hp.UNSEEN)
- #hp_map[list(pixels)] = 0
- #hp.mollview(hp_map, title=catalog_name, coord='C', cbar=None)
-
-
for i, checks in enumerate(self.flags_to_check):
#quantities_this = checks['quantities']
kind = checks['kind']
@@ -257,66 +213,41 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
fig, ax = plt.subplots()
for quantity in quantities_this:
- #PL : only currently works for doubles
+ #PL : only currently works for doubles and booleans
value = catalog_data[quantity]
- count = len(value)
- tot_num = comm.reduce(count)
- counts = comm.allgather(count)
- if rank==0:
- if kind=='double':
- recvbuf = np.zeros(tot_num)
- elif kind=='bool':
- recvbuf = np.zeros(tot_num)!=0.0
- else:
- recvbuf = None
- displs = np.array([sum(counts[:p]) for p in range(size)])
- if kind=='double':
- comm.Gatherv([value,MPI.DOUBLE], [recvbuf,counts,displs,MPI.DOUBLE],root=0)
- elif kind=='float':
- comm.Gatherv([value,MPI.FLOAT], [recvbuf,counts,displs,MPI.FLOAT],root=0)
- elif kind=='int':
- comm.Gatherv([value,MPI.INT], [recvbuf,counts,displs,MPI.INT],root=0)
- elif kind=='bool':
- comm.Gatherv([value,MPI.BOOL], [recvbuf,counts,displs,MPI.BOOL],root=0)
- elif kind=='int64':
- comm.Gatherv([value,MPI.INT64_T], [recvbuf, counts, displs, MPI.INT64_T],root=0)
- else:
- print("add proper exception catch here")
-
-
+ recvbuf = send_to_master(value, kind)
+
if rank==0:
+ flag_val=False
result_this_quantity = {}
galaxy_count = len(recvbuf)
+ recvbuf = np.logical_not(recvbuf)
frac = np.sum(recvbuf)/(len(recvbuf)+0.0)*100.
xbins = np.linspace(np.min(recvbuf_ra),np.max(recvbuf_ra),50)
ybins = np.linspace(np.min(recvbuf_dec),np.max(recvbuf_dec),50)
- area = (xbins[1]-xbins[0])*(ybins[1]-ybins[0])*(60.**2) #arcminutes
- im = ax.hist2d(recvbuf_ra[recvbuf],recvbuf_dec[recvbuf], bins=(xbins,ybins),weights = 1./area*np.ones(len(recvbuf_ra[recvbuf])))#, label=quantity)
+ area = (xbins[1]-xbins[0])*(ybins[1]-ybins[0])*(60.**2)*(np.sin(ybins[1]*np.pi/180.)-np.sin(ybins[0]*np.pi/180.))
+ im = ax.hist2d(recvbuf_ra[recvbuf],recvbuf_dec[recvbuf], bins=(xbins,ybins),weights = 1./area*np.ones(len(recvbuf_ra[recvbuf])))
result_this_quantity[0] = (
- frac,
- ('fail' if frac<50 else 'pass'),
+ np.mean(im[0][im[0]>0]),
+ 'pass',
quantity,
)
-
result_this_quantity[1] = (
- np.mean(im[0][im[0]>0]),
- ('fail' if np.mean(im[0][im[0]>0]<1.0) else 'pass'),
+ frac,
+ 'pass',
quantity,
)
- quantity_hashes[tuple(result_this_quantity[s][0] for s in [0,1])].add(quantity)
self.record_result(
result_this_quantity,
quantity ,
- plots_tot[i]
)
-
- #ax.set_xlabel(('log ' if checks.get('log') else '') + quantity_group_label, size=self.font_size)
- #ax.yaxis.set_ticklabels([])
+
+
if checks.get('plot_min') is not None: #zero values fail otherwise
ax.set_xlim(left=checks.get('plot_min'))
if checks.get('plot_max') is not None:
@@ -328,14 +259,16 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
fig.savefig(os.path.join(output_dir, plots_tot[i]))
plt.close(fig)
+
if rank==0:
self.generate_summary(output_dir)
else:
self.current_failed_count=0
-
+
self.current_failed_count = comm.bcast(self.current_failed_count, root=0)
return TestResult(passed=(self.current_failed_count == 0), score=self.current_failed_count)
+
def conclude_test(self, output_dir):
- self.generate_summary(output_dir, aggregated=True)
+ self.generate_summary(output_dir)
diff --git a/descqa/srv_readiness_test.py b/descqa/srv_readiness_test.py
index 08786e18..62c6feb9 100644
--- a/descqa/srv_readiness_test.py
+++ b/descqa/srv_readiness_test.py
@@ -37,13 +37,13 @@ def check_uniqueness(x, mask=None):
return check_uniqueness(x[mask])
-def find_outlier(x):
+def find_outlier(x, subset_size):
"""
return a bool array indicating outliers or not in *x*
"""
# note: percentile calculation should be robust to outliers so doesn't need the full sample. This speeds the calculation up a lot. There is a chance of repeated values but for large datasets this is not significant.
- if len(x)>100000:
- x_small = np.random.choice(x,size=10000)
+ if len(x)> subset_size:
+ x_small = np.random.choice(x,size=subset_size)
l, m, h = np.percentile(x_small, norm.cdf([-1, 0, 1])*100)
else:
l, m, h = np.percentile(x_small, norm.cdf([-1, 0, 1])*100)
@@ -59,12 +59,12 @@ def calc_frac(x, func, total=None):
return np.count_nonzero(func(x)) / total
-def calc_median(x):
+def calc_median(x, subset_size):
"""
calculate the median of sample, using sub-set for large datasets
"""
- if len(x)>100000:
- x_small = np.random.choice(x,size=10000)
+ if len(x)>subset_size:
+ x_small = np.random.choice(x,size=subset_size)
return np.median(x_small)
else:
return np.median(x)
@@ -115,7 +115,7 @@ class CheckQuantities(BaseValidationTest):
stats = OrderedDict((
('min', np.min),
('max', np.max),
- ('median', np.median),
+ ('median', calc_median),
('mean', np.mean),
('std', np.std),
('f_inf', np.isinf),
@@ -135,6 +135,7 @@ def __init__(self, **kwargs):
self.title_size = kwargs.get('title_size', 'small')
self.font_size = kwargs.get('font_size', 12)
self.legend_size = kwargs.get('legend_size', 'x-small')
+ self.subset_size = kwargs.get('subset_size',100000)
if not any((
self.quantities_to_check,
@@ -294,9 +295,9 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
quantity_tot =[]
label_tot=[]
plots_tot=[]
+ checks_tot = []
for i, checks in enumerate(self.quantities_to_check):
# total list of quantities
-
quantity_patterns = checks['quantities'] if isinstance(checks['quantities'], (tuple, list)) else [checks['quantities']]
quantities_this = set()
@@ -319,6 +320,7 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
plot_filename = 'p{:02d}_{}.png'.format(i, quantity_group_label)
label_tot.append(quantity_group_label)
plots_tot.append(plot_filename)
+ checks_tot.append(checks)
quantities_this_new=[]
@@ -347,17 +349,10 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
for quantity in quantities_this:
- #PL : only currently works for doubles
+ #PL : only currently works for doubles and booleans right now
value = catalog_data[quantity]
- count = len(value)
- tot_num = comm.reduce(count)
- counts = comm.allgather(count)
- if rank==0:
- recvbuf = np.zeros(tot_num)
- else:
- recvbuf = None
- displs = np.array([sum(counts[:p]) for p in range(size)])
- comm.Gatherv([value,MPI.DOUBLE], [recvbuf,counts,displs,MPI.DOUBLE],root=0)
+ recvbuf = send_to_master(value, 'double')
+
need_plot = False
if rank==0:
@@ -383,7 +378,9 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
b = time.time()
# PL: note there are many faster ways to do this
if s == 'f_outlier':
- s_value_rank = calc_frac(value_finite, func)
+ s_value_rank = calc_frac(value_finite, self.subset_size)
+ elif s == 'median':
+ s_value_rank = calc_median(value_finite, self.subset_size)
elif s.startswith('f_'):
s_value = calc_frac(value, func)
else:
diff --git a/descqa/srv_s1s2.py b/descqa/srv_s1s2.py
index fc18e260..84d0567e 100644
--- a/descqa/srv_s1s2.py
+++ b/descqa/srv_s1s2.py
@@ -257,15 +257,7 @@ def run_on_single_catalog(self, catalog_instance, catalog_name, output_dir):
recvbuf={}
for quantity in quantities:
data_rank[quantity] = catalog_data[quantity]
- count = len(data_rank[quantity])
- tot_num = comm.reduce(count)
- counts = comm.allgather(count)
- if rank==0:
- recvbuf[quantity] = np.zeros(tot_num)
- else:
- recvbuf[quantity] = None
- displs = np.array([sum(counts[:p]) for p in range(size)])
- comm.Gatherv([data_rank[quantity],MPI.DOUBLE], [recvbuf[quantity], counts, displs, MPI.DOUBLE],root=0)
+ recvbuf[quantity] = send_to_master(data_rank[quantity],'double')
e1,e2 = shear_from_moments(recvbuf[self.Ixx+'_'+band],recvbuf[self.Ixy+'_'+band],recvbuf[self.Iyy+'_'+band])
e1psf,e2psf = shear_from_moments(recvbuf[self.IxxPSF+'_'+band],recvbuf[self.IxyPSF+'_'+band],recvbuf[self.IyyPSF+'_'+band])